Skip to content

Commit 2c7c570

Browse files
committed
refactor: migrate redb service from adapter::kv to impl Access directly
1 parent b64d20b commit 2c7c570

File tree

7 files changed

+345
-143
lines changed

7 files changed

+345
-143
lines changed

core/src/services/redb/backend.rs

Lines changed: 78 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,13 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
use std::fmt::Debug;
19-
use std::fmt::Formatter;
2018
use std::sync::Arc;
2119

22-
use crate::Builder;
23-
use crate::Error;
24-
use crate::ErrorKind;
25-
use crate::Scheme;
26-
use crate::raw::adapters::kv;
20+
use super::config::RedbConfig;
21+
use super::core::*;
22+
use super::deleter::RedbDeleter;
23+
use super::writer::RedbWriter;
2724
use crate::raw::*;
28-
use crate::services::RedbConfig;
2925
use crate::*;
3026

3127
/// Redb service support.
@@ -112,159 +108,106 @@ impl Builder for RedbBuilder {
112108

113109
create_table(&db, &table_name)?;
114110

115-
Ok(RedbBackend::new(Adapter {
111+
let root = normalize_root(&self.config.root.unwrap_or_default());
112+
113+
Ok(RedbBackend::new(RedbCore {
116114
datadir,
117115
table: table_name,
118116
db,
119117
})
120-
.with_root(self.config.root.as_deref().unwrap_or_default()))
118+
.with_normalized_root(root))
121119
}
122120
}
123121

124122
/// Backend for Redb services.
125-
pub type RedbBackend = kv::Backend<Adapter>;
126-
127-
#[derive(Clone)]
128-
pub struct Adapter {
129-
datadir: Option<String>,
130-
table: String,
131-
db: Arc<redb::Database>,
123+
#[derive(Clone, Debug)]
124+
pub struct RedbBackend {
125+
core: Arc<RedbCore>,
126+
root: String,
127+
info: Arc<AccessorInfo>,
132128
}
133129

134-
impl Debug for Adapter {
135-
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
136-
let mut ds = f.debug_struct("Adapter");
137-
ds.field("path", &self.datadir);
138-
ds.finish()
130+
impl RedbBackend {
131+
pub fn new(core: RedbCore) -> Self {
132+
let info = AccessorInfo::default();
133+
info.set_scheme(Scheme::Redb.into_static());
134+
info.set_name(&core.table);
135+
info.set_root("/");
136+
info.set_native_capability(Capability {
137+
read: true,
138+
stat: true,
139+
write: true,
140+
write_can_empty: true,
141+
delete: true,
142+
shared: false,
143+
..Default::default()
144+
});
145+
146+
Self {
147+
core: Arc::new(core),
148+
root: "/".to_string(),
149+
info: Arc::new(info),
150+
}
139151
}
140-
}
141-
142-
impl kv::Adapter for Adapter {
143-
type Scanner = ();
144152

145-
fn info(&self) -> kv::Info {
146-
kv::Info::new(
147-
Scheme::Redb,
148-
&self.table,
149-
Capability {
150-
read: true,
151-
write: true,
152-
shared: false,
153-
..Default::default()
154-
},
155-
)
153+
fn with_normalized_root(mut self, root: String) -> Self {
154+
self.info.set_root(&root);
155+
self.root = root;
156+
self
156157
}
158+
}
157159

158-
async fn get(&self, path: &str) -> Result<Option<Buffer>> {
159-
let read_txn = self.db.begin_read().map_err(parse_transaction_error)?;
160-
161-
let table_define: redb::TableDefinition<&str, &[u8]> =
162-
redb::TableDefinition::new(&self.table);
163-
164-
let table = read_txn
165-
.open_table(table_define)
166-
.map_err(parse_table_error)?;
160+
impl Access for RedbBackend {
161+
type Reader = Buffer;
162+
type Writer = RedbWriter;
163+
type Lister = ();
164+
type Deleter = oio::OneShotDeleter<RedbDeleter>;
167165

168-
let result = match table.get(path) {
169-
Ok(Some(v)) => Ok(Some(v.value().to_vec())),
170-
Ok(None) => Ok(None),
171-
Err(e) => Err(parse_storage_error(e)),
172-
}?;
173-
Ok(result.map(Buffer::from))
166+
fn info(&self) -> Arc<AccessorInfo> {
167+
self.info.clone()
174168
}
175169

176-
async fn set(&self, path: &str, value: Buffer) -> Result<()> {
177-
let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;
170+
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
171+
let p = build_abs_path(&self.root, path);
178172

179-
let table_define: redb::TableDefinition<&str, &[u8]> =
180-
redb::TableDefinition::new(&self.table);
181-
182-
{
183-
let mut table = write_txn
184-
.open_table(table_define)
185-
.map_err(parse_table_error)?;
186-
187-
table
188-
.insert(path, &*value.to_vec())
189-
.map_err(parse_storage_error)?;
173+
if p == build_abs_path(&self.root, "") {
174+
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
175+
} else {
176+
let bs = self.core.get(&p)?;
177+
match bs {
178+
Some(bs) => Ok(RpStat::new(
179+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
180+
)),
181+
None => Err(Error::new(ErrorKind::NotFound, "kv not found in redb")),
182+
}
190183
}
191-
192-
write_txn.commit().map_err(parse_commit_error)?;
193-
Ok(())
194184
}
195185

196-
async fn delete(&self, path: &str) -> Result<()> {
197-
let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;
198-
199-
let table_define: redb::TableDefinition<&str, &[u8]> =
200-
redb::TableDefinition::new(&self.table);
201-
202-
{
203-
let mut table = write_txn
204-
.open_table(table_define)
205-
.map_err(parse_table_error)?;
206-
207-
table.remove(path).map_err(parse_storage_error)?;
208-
}
209-
210-
write_txn.commit().map_err(parse_commit_error)?;
211-
Ok(())
186+
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
187+
let p = build_abs_path(&self.root, path);
188+
let bs = match self.core.get(&p)? {
189+
Some(bs) => bs,
190+
None => {
191+
return Err(Error::new(ErrorKind::NotFound, "kv not found in redb"));
192+
}
193+
};
194+
Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
212195
}
213-
}
214196

215-
fn parse_transaction_error(e: redb::TransactionError) -> Error {
216-
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
217-
}
218-
219-
fn parse_table_error(e: redb::TableError) -> Error {
220-
match e {
221-
redb::TableError::TableDoesNotExist(_) => {
222-
Error::new(ErrorKind::NotFound, "error from redb").set_source(e)
223-
}
224-
_ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e),
197+
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
198+
let p = build_abs_path(&self.root, path);
199+
Ok((RpWrite::new(), RedbWriter::new(self.core.clone(), p)))
225200
}
226-
}
227-
228-
fn parse_storage_error(e: redb::StorageError) -> Error {
229-
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
230-
}
231-
232-
fn parse_database_error(e: redb::DatabaseError) -> Error {
233-
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
234-
}
235-
236-
fn parse_commit_error(e: redb::CommitError) -> Error {
237-
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
238-
}
239-
240-
/// Check if a table exists, otherwise create it.
241-
fn create_table(db: &redb::Database, table: &str) -> Result<()> {
242-
// Only one `WriteTransaction` is permitted at same time,
243-
// applying new one will block until it available.
244-
//
245-
// So we first try checking table existence via `ReadTransaction`.
246-
{
247-
let read_txn = db.begin_read().map_err(parse_transaction_error)?;
248-
249-
let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);
250201

251-
match read_txn.open_table(table_define) {
252-
Ok(_) => return Ok(()),
253-
Err(redb::TableError::TableDoesNotExist(_)) => (),
254-
Err(e) => return Err(parse_table_error(e)),
255-
}
202+
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
203+
Ok((
204+
RpDelete::default(),
205+
oio::OneShotDeleter::new(RedbDeleter::new(self.core.clone(), self.root.clone())),
206+
))
256207
}
257208

258-
{
259-
let write_txn = db.begin_write().map_err(parse_transaction_error)?;
260-
261-
let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);
262-
263-
write_txn
264-
.open_table(table_define)
265-
.map_err(parse_table_error)?;
266-
write_txn.commit().map_err(parse_commit_error)?;
209+
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
210+
let _ = build_abs_path(&self.root, path);
211+
Ok((RpList::default(), ()))
267212
}
268-
269-
Ok(())
270213
}

core/src/services/redb/config.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,25 +17,27 @@
1717

1818
use std::fmt::Debug;
1919

20-
use super::backend::RedbBuilder;
2120
use serde::Deserialize;
2221
use serde::Serialize;
2322

23+
use super::backend::RedbBuilder;
24+
2425
/// Config for redb service support.
2526
#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
2627
#[serde(default)]
2728
#[non_exhaustive]
2829
pub struct RedbConfig {
2930
/// path to the redb data directory.
3031
pub datadir: Option<String>,
31-
/// The root for redb.
32-
pub root: Option<String>,
3332
/// The table name for redb.
3433
pub table: Option<String>,
34+
/// The root for redb.
35+
pub root: Option<String>,
3536
}
3637

3738
impl crate::Configurator for RedbConfig {
3839
type Builder = RedbBuilder;
40+
3941
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
4042
let mut map = uri.options().clone();
4143

0 commit comments

Comments
 (0)