Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 78 additions & 135 deletions core/src/services/redb/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
// specific language governing permissions and limitations
// under the License.

use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::Arc;

use crate::Builder;
use crate::Error;
use crate::ErrorKind;
use crate::Scheme;
use crate::raw::adapters::kv;
use super::config::RedbConfig;
use super::core::*;
use super::deleter::RedbDeleter;
use super::writer::RedbWriter;
use crate::raw::*;
use crate::services::RedbConfig;
use crate::*;

/// Redb service support.
Expand Down Expand Up @@ -112,159 +108,106 @@ impl Builder for RedbBuilder {

create_table(&db, &table_name)?;

Ok(RedbBackend::new(Adapter {
let root = normalize_root(&self.config.root.unwrap_or_default());

Ok(RedbBackend::new(RedbCore {
datadir,
table: table_name,
db,
})
.with_root(self.config.root.as_deref().unwrap_or_default()))
.with_normalized_root(root))
}
}

/// Backend for Redb services.
pub type RedbBackend = kv::Backend<Adapter>;

#[derive(Clone)]
pub struct Adapter {
datadir: Option<String>,
table: String,
db: Arc<redb::Database>,
#[derive(Clone, Debug)]
pub struct RedbBackend {
core: Arc<RedbCore>,
root: String,
info: Arc<AccessorInfo>,
}

impl Debug for Adapter {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let mut ds = f.debug_struct("Adapter");
ds.field("path", &self.datadir);
ds.finish()
impl RedbBackend {
pub fn new(core: RedbCore) -> Self {
let info = AccessorInfo::default();
info.set_scheme(Scheme::Redb.into_static());
info.set_name(&core.table);
info.set_root("/");
info.set_native_capability(Capability {
read: true,
stat: true,
write: true,
write_can_empty: true,
delete: true,
shared: false,
..Default::default()
});

Self {
core: Arc::new(core),
root: "/".to_string(),
info: Arc::new(info),
}
}
}

impl kv::Adapter for Adapter {
type Scanner = ();

fn info(&self) -> kv::Info {
kv::Info::new(
Scheme::Redb,
&self.table,
Capability {
read: true,
write: true,
shared: false,
..Default::default()
},
)
fn with_normalized_root(mut self, root: String) -> Self {
self.info.set_root(&root);
self.root = root;
self
}
}

async fn get(&self, path: &str) -> Result<Option<Buffer>> {
let read_txn = self.db.begin_read().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new(&self.table);

let table = read_txn
.open_table(table_define)
.map_err(parse_table_error)?;
impl Access for RedbBackend {
type Reader = Buffer;
type Writer = RedbWriter;
type Lister = ();
type Deleter = oio::OneShotDeleter<RedbDeleter>;

let result = match table.get(path) {
Ok(Some(v)) => Ok(Some(v.value().to_vec())),
Ok(None) => Ok(None),
Err(e) => Err(parse_storage_error(e)),
}?;
Ok(result.map(Buffer::from))
fn info(&self) -> Arc<AccessorInfo> {
self.info.clone()
}

async fn set(&self, path: &str, value: Buffer) -> Result<()> {
let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;
async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
let p = build_abs_path(&self.root, path);

let table_define: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new(&self.table);

{
let mut table = write_txn
.open_table(table_define)
.map_err(parse_table_error)?;

table
.insert(path, &*value.to_vec())
.map_err(parse_storage_error)?;
if p == build_abs_path(&self.root, "") {
Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
} else {
let bs = self.core.get(&p)?;
match bs {
Some(bs) => Ok(RpStat::new(
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
)),
None => Err(Error::new(ErrorKind::NotFound, "kv not found in redb")),
}
}

write_txn.commit().map_err(parse_commit_error)?;
Ok(())
}

async fn delete(&self, path: &str) -> Result<()> {
let write_txn = self.db.begin_write().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> =
redb::TableDefinition::new(&self.table);

{
let mut table = write_txn
.open_table(table_define)
.map_err(parse_table_error)?;

table.remove(path).map_err(parse_storage_error)?;
}

write_txn.commit().map_err(parse_commit_error)?;
Ok(())
async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
let p = build_abs_path(&self.root, path);
let bs = match self.core.get(&p)? {
Some(bs) => bs,
None => {
return Err(Error::new(ErrorKind::NotFound, "kv not found in redb"));
}
};
Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
}
}

fn parse_transaction_error(e: redb::TransactionError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}

fn parse_table_error(e: redb::TableError) -> Error {
match e {
redb::TableError::TableDoesNotExist(_) => {
Error::new(ErrorKind::NotFound, "error from redb").set_source(e)
}
_ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e),
async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> {
let p = build_abs_path(&self.root, path);
Ok((RpWrite::new(), RedbWriter::new(self.core.clone(), p)))
}
}

fn parse_storage_error(e: redb::StorageError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}

fn parse_database_error(e: redb::DatabaseError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}

fn parse_commit_error(e: redb::CommitError) -> Error {
Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
}

/// Check if a table exists, otherwise create it.
fn create_table(db: &redb::Database, table: &str) -> Result<()> {
// Only one `WriteTransaction` is permitted at same time,
// applying new one will block until it available.
//
// So we first try checking table existence via `ReadTransaction`.
{
let read_txn = db.begin_read().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);

match read_txn.open_table(table_define) {
Ok(_) => return Ok(()),
Err(redb::TableError::TableDoesNotExist(_)) => (),
Err(e) => return Err(parse_table_error(e)),
}
async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
Ok((
RpDelete::default(),
oio::OneShotDeleter::new(RedbDeleter::new(self.core.clone(), self.root.clone())),
))
}

{
let write_txn = db.begin_write().map_err(parse_transaction_error)?;

let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table);

write_txn
.open_table(table_define)
.map_err(parse_table_error)?;
write_txn.commit().map_err(parse_commit_error)?;
async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
let _ = build_abs_path(&self.root, path);
Ok((RpList::default(), ()))
}

Ok(())
}
8 changes: 5 additions & 3 deletions core/src/services/redb/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@

use std::fmt::Debug;

use super::backend::RedbBuilder;
use serde::Deserialize;
use serde::Serialize;

use super::backend::RedbBuilder;

/// Config for redb service support.
#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
#[non_exhaustive]
pub struct RedbConfig {
/// path to the redb data directory.
pub datadir: Option<String>,
/// The root for redb.
pub root: Option<String>,
/// The table name for redb.
pub table: Option<String>,
/// The root for redb.
pub root: Option<String>,
}

impl crate::Configurator for RedbConfig {
type Builder = RedbBuilder;

fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
let mut map = uri.options().clone();

Expand Down
Loading
Loading