|
15 | 15 | // specific language governing permissions and limitations |
16 | 16 | // under the License. |
17 | 17 |
|
18 | | -use std::fmt::Debug; |
19 | | -use std::fmt::Formatter; |
20 | 18 | use std::sync::Arc; |
21 | 19 |
|
22 | | -use crate::Builder; |
23 | | -use crate::Error; |
24 | | -use crate::ErrorKind; |
25 | | -use crate::Scheme; |
26 | | -use crate::raw::adapters::kv; |
| 20 | +use super::config::RedbConfig; |
| 21 | +use super::core::*; |
| 22 | +use super::deleter::RedbDeleter; |
| 23 | +use super::writer::RedbWriter; |
27 | 24 | use crate::raw::*; |
28 | | -use crate::services::RedbConfig; |
29 | 25 | use crate::*; |
30 | 26 |
|
31 | 27 | /// Redb service support. |
@@ -112,159 +108,106 @@ impl Builder for RedbBuilder { |
112 | 108 |
|
113 | 109 | create_table(&db, &table_name)?; |
114 | 110 |
|
115 | | - Ok(RedbBackend::new(Adapter { |
| 111 | + let root = normalize_root(&self.config.root.unwrap_or_default()); |
| 112 | + |
| 113 | + Ok(RedbBackend::new(RedbCore { |
116 | 114 | datadir, |
117 | 115 | table: table_name, |
118 | 116 | db, |
119 | 117 | }) |
120 | | - .with_root(self.config.root.as_deref().unwrap_or_default())) |
| 118 | + .with_normalized_root(root)) |
121 | 119 | } |
122 | 120 | } |
123 | 121 |
|
124 | 122 | /// Backend for Redb services. |
125 | | -pub type RedbBackend = kv::Backend<Adapter>; |
126 | | - |
127 | | -#[derive(Clone)] |
128 | | -pub struct Adapter { |
129 | | - datadir: Option<String>, |
130 | | - table: String, |
131 | | - db: Arc<redb::Database>, |
| 123 | +#[derive(Clone, Debug)] |
| 124 | +pub struct RedbBackend { |
| 125 | + core: Arc<RedbCore>, |
| 126 | + root: String, |
| 127 | + info: Arc<AccessorInfo>, |
132 | 128 | } |
133 | 129 |
|
134 | | -impl Debug for Adapter { |
135 | | - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { |
136 | | - let mut ds = f.debug_struct("Adapter"); |
137 | | - ds.field("path", &self.datadir); |
138 | | - ds.finish() |
| 130 | +impl RedbBackend { |
| 131 | + pub fn new(core: RedbCore) -> Self { |
| 132 | + let info = AccessorInfo::default(); |
| 133 | + info.set_scheme(Scheme::Redb.into_static()); |
| 134 | + info.set_name(&core.table); |
| 135 | + info.set_root("/"); |
| 136 | + info.set_native_capability(Capability { |
| 137 | + read: true, |
| 138 | + stat: true, |
| 139 | + write: true, |
| 140 | + write_can_empty: true, |
| 141 | + delete: true, |
| 142 | + shared: false, |
| 143 | + ..Default::default() |
| 144 | + }); |
| 145 | + |
| 146 | + Self { |
| 147 | + core: Arc::new(core), |
| 148 | + root: "/".to_string(), |
| 149 | + info: Arc::new(info), |
| 150 | + } |
139 | 151 | } |
140 | | -} |
141 | | - |
142 | | -impl kv::Adapter for Adapter { |
143 | | - type Scanner = (); |
144 | 152 |
|
145 | | - fn info(&self) -> kv::Info { |
146 | | - kv::Info::new( |
147 | | - Scheme::Redb, |
148 | | - &self.table, |
149 | | - Capability { |
150 | | - read: true, |
151 | | - write: true, |
152 | | - shared: false, |
153 | | - ..Default::default() |
154 | | - }, |
155 | | - ) |
| 153 | + fn with_normalized_root(mut self, root: String) -> Self { |
| 154 | + self.info.set_root(&root); |
| 155 | + self.root = root; |
| 156 | + self |
156 | 157 | } |
| 158 | +} |
157 | 159 |
|
158 | | - async fn get(&self, path: &str) -> Result<Option<Buffer>> { |
159 | | - let read_txn = self.db.begin_read().map_err(parse_transaction_error)?; |
160 | | - |
161 | | - let table_define: redb::TableDefinition<&str, &[u8]> = |
162 | | - redb::TableDefinition::new(&self.table); |
163 | | - |
164 | | - let table = read_txn |
165 | | - .open_table(table_define) |
166 | | - .map_err(parse_table_error)?; |
| 160 | +impl Access for RedbBackend { |
| 161 | + type Reader = Buffer; |
| 162 | + type Writer = RedbWriter; |
| 163 | + type Lister = (); |
| 164 | + type Deleter = oio::OneShotDeleter<RedbDeleter>; |
167 | 165 |
|
168 | | - let result = match table.get(path) { |
169 | | - Ok(Some(v)) => Ok(Some(v.value().to_vec())), |
170 | | - Ok(None) => Ok(None), |
171 | | - Err(e) => Err(parse_storage_error(e)), |
172 | | - }?; |
173 | | - Ok(result.map(Buffer::from)) |
| 166 | + fn info(&self) -> Arc<AccessorInfo> { |
| 167 | + self.info.clone() |
174 | 168 | } |
175 | 169 |
|
176 | | - async fn set(&self, path: &str, value: Buffer) -> Result<()> { |
177 | | - let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; |
| 170 | + async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { |
| 171 | + let p = build_abs_path(&self.root, path); |
178 | 172 |
|
179 | | - let table_define: redb::TableDefinition<&str, &[u8]> = |
180 | | - redb::TableDefinition::new(&self.table); |
181 | | - |
182 | | - { |
183 | | - let mut table = write_txn |
184 | | - .open_table(table_define) |
185 | | - .map_err(parse_table_error)?; |
186 | | - |
187 | | - table |
188 | | - .insert(path, &*value.to_vec()) |
189 | | - .map_err(parse_storage_error)?; |
| 173 | + if p == build_abs_path(&self.root, "") { |
| 174 | + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) |
| 175 | + } else { |
| 176 | + let bs = self.core.get(&p)?; |
| 177 | + match bs { |
| 178 | + Some(bs) => Ok(RpStat::new( |
| 179 | + Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64), |
| 180 | + )), |
| 181 | + None => Err(Error::new(ErrorKind::NotFound, "kv not found in redb")), |
| 182 | + } |
190 | 183 | } |
191 | | - |
192 | | - write_txn.commit().map_err(parse_commit_error)?; |
193 | | - Ok(()) |
194 | 184 | } |
195 | 185 |
|
196 | | - async fn delete(&self, path: &str) -> Result<()> { |
197 | | - let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; |
198 | | - |
199 | | - let table_define: redb::TableDefinition<&str, &[u8]> = |
200 | | - redb::TableDefinition::new(&self.table); |
201 | | - |
202 | | - { |
203 | | - let mut table = write_txn |
204 | | - .open_table(table_define) |
205 | | - .map_err(parse_table_error)?; |
206 | | - |
207 | | - table.remove(path).map_err(parse_storage_error)?; |
208 | | - } |
209 | | - |
210 | | - write_txn.commit().map_err(parse_commit_error)?; |
211 | | - Ok(()) |
| 186 | + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { |
| 187 | + let p = build_abs_path(&self.root, path); |
| 188 | + let bs = match self.core.get(&p)? { |
| 189 | + Some(bs) => bs, |
| 190 | + None => { |
| 191 | + return Err(Error::new(ErrorKind::NotFound, "kv not found in redb")); |
| 192 | + } |
| 193 | + }; |
| 194 | + Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) |
212 | 195 | } |
213 | | -} |
214 | 196 |
|
215 | | -fn parse_transaction_error(e: redb::TransactionError) -> Error { |
216 | | - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) |
217 | | -} |
218 | | - |
219 | | -fn parse_table_error(e: redb::TableError) -> Error { |
220 | | - match e { |
221 | | - redb::TableError::TableDoesNotExist(_) => { |
222 | | - Error::new(ErrorKind::NotFound, "error from redb").set_source(e) |
223 | | - } |
224 | | - _ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e), |
| 197 | + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { |
| 198 | + let p = build_abs_path(&self.root, path); |
| 199 | + Ok((RpWrite::new(), RedbWriter::new(self.core.clone(), p))) |
225 | 200 | } |
226 | | -} |
227 | | - |
228 | | -fn parse_storage_error(e: redb::StorageError) -> Error { |
229 | | - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) |
230 | | -} |
231 | | - |
232 | | -fn parse_database_error(e: redb::DatabaseError) -> Error { |
233 | | - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) |
234 | | -} |
235 | | - |
236 | | -fn parse_commit_error(e: redb::CommitError) -> Error { |
237 | | - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) |
238 | | -} |
239 | | - |
240 | | -/// Check if a table exists, otherwise create it. |
241 | | -fn create_table(db: &redb::Database, table: &str) -> Result<()> { |
242 | | - // Only one `WriteTransaction` is permitted at same time, |
243 | | - // applying new one will block until it available. |
244 | | - // |
245 | | - // So we first try checking table existence via `ReadTransaction`. |
246 | | - { |
247 | | - let read_txn = db.begin_read().map_err(parse_transaction_error)?; |
248 | | - |
249 | | - let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table); |
250 | 201 |
|
251 | | - match read_txn.open_table(table_define) { |
252 | | - Ok(_) => return Ok(()), |
253 | | - Err(redb::TableError::TableDoesNotExist(_)) => (), |
254 | | - Err(e) => return Err(parse_table_error(e)), |
255 | | - } |
| 202 | + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { |
| 203 | + Ok(( |
| 204 | + RpDelete::default(), |
| 205 | + oio::OneShotDeleter::new(RedbDeleter::new(self.core.clone(), self.root.clone())), |
| 206 | + )) |
256 | 207 | } |
257 | 208 |
|
258 | | - { |
259 | | - let write_txn = db.begin_write().map_err(parse_transaction_error)?; |
260 | | - |
261 | | - let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table); |
262 | | - |
263 | | - write_txn |
264 | | - .open_table(table_define) |
265 | | - .map_err(parse_table_error)?; |
266 | | - write_txn.commit().map_err(parse_commit_error)?; |
| 209 | + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { |
| 210 | + let _ = build_abs_path(&self.root, path); |
| 211 | + Ok((RpList::default(), ())) |
267 | 212 | } |
268 | | - |
269 | | - Ok(()) |
270 | 213 | } |
0 commit comments