diff --git a/engine_tiflash/src/lib.rs b/engine_tiflash/src/lib.rs index 2c4fdac53b2..cf3ceba1b0b 100644 --- a/engine_tiflash/src/lib.rs +++ b/engine_tiflash/src/lib.rs @@ -56,11 +56,8 @@ pub use crate::table_properties::*; mod mixed_engine; mod ps_engine; mod rocks_engine; -#[cfg(feature = "enable-pagestorage")] -pub use crate::ps_engine::ps_write_batch::*; -#[cfg(not(feature = "enable-pagestorage"))] -pub use crate::rocks_engine::write_batch::*; -pub use crate::{ps_engine::PSLogEngine, rocks_engine::db_vector}; + +pub use crate::{mixed_engine::write_batch::*, ps_engine::PSLogEngine, rocks_engine::db_vector}; pub mod mvcc_properties; pub use crate::mvcc_properties::*; diff --git a/engine_tiflash/src/mixed_engine/elementary.rs b/engine_tiflash/src/mixed_engine/elementary.rs index f849f9818f7..8e23386b326 100644 --- a/engine_tiflash/src/mixed_engine/elementary.rs +++ b/engine_tiflash/src/mixed_engine/elementary.rs @@ -3,7 +3,7 @@ use engine_rocks::RocksEngineIterator; use engine_traits::{IterOptions, ReadOptions, Result}; -use super::MixedDbVector; +use super::{write_batch::MixedWriteBatch, MixedDbVector}; pub trait ElementaryEngine: std::fmt::Debug { fn put(&self, key: &[u8], value: &[u8]) -> Result<()>; @@ -33,4 +33,8 @@ pub trait ElementaryEngine: std::fmt::Debug { ) -> Result<()>; fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result; + + fn write_batch(&self) -> MixedWriteBatch; + + fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch; } diff --git a/engine_tiflash/src/mixed_engine/mod.rs b/engine_tiflash/src/mixed_engine/mod.rs index 26449d7ba80..588ffc994bb 100644 --- a/engine_tiflash/src/mixed_engine/mod.rs +++ b/engine_tiflash/src/mixed_engine/mod.rs @@ -1,6 +1,7 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. pub mod elementary; +pub mod write_batch; use std::{ fmt::{self, Debug, Formatter}, diff --git a/engine_tiflash/src/mixed_engine/write_batch.rs b/engine_tiflash/src/mixed_engine/write_batch.rs new file mode 100644 index 00000000000..1dbd39482ce --- /dev/null +++ b/engine_tiflash/src/mixed_engine/write_batch.rs @@ -0,0 +1,159 @@ +// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. + +#![allow(unused_variables)] +use std::sync::Arc; + +use engine_traits::{self, Mutable, Result, WriteBatchExt, WriteOptions}; +use proxy_ffi::interfaces_ffi::RawCppPtr; +use rocksdb::{WriteBatch as RawWriteBatch, DB}; +use tikv_util::Either; + +use crate::{engine::RocksEngine, ps_engine::add_prefix, r2e, PageStorageExt}; + +pub const WRITE_BATCH_MAX_BATCH: usize = 16; +pub const WRITE_BATCH_LIMIT: usize = 16; + +pub struct MixedWriteBatch { + pub inner: + Either, +} + +impl WriteBatchExt for RocksEngine { + type WriteBatch = MixedWriteBatch; + + const WRITE_BATCH_MAX_KEYS: usize = 256; + + fn write_batch(&self) -> MixedWriteBatch { + self.element_engine.as_ref().unwrap().write_batch() + } + + fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch { + self.element_engine + .as_ref() + .unwrap() + .write_batch_with_cap(cap) + } +} + +impl engine_traits::WriteBatch for MixedWriteBatch { + fn write_opt(&mut self, opts: &WriteOptions) -> Result { + // write into ps + match self.inner.as_mut() { + Either::Left(x) => x.write_opt(opts), + Either::Right(x) => x.write_opt(opts), + } + } + + fn data_size(&self) -> usize { + match self.inner.as_ref() { + Either::Left(x) => x.data_size(), + Either::Right(x) => x.data_size(), + } + } + + fn count(&self) -> usize { + match self.inner.as_ref() { + Either::Left(x) => x.count(), + Either::Right(x) => x.count(), + } + } + + fn is_empty(&self) -> bool { + match self.inner.as_ref() { + Either::Left(x) => x.is_empty(), + Either::Right(x) => x.is_empty(), + } + } + + fn should_write_to_engine(&self) -> bool { + // Disable TiKV's logic, and using Proxy's instead. + false + } + + fn clear(&mut self) { + match self.inner.as_mut() { + Either::Left(x) => x.clear(), + Either::Right(x) => x.clear(), + } + } + + fn set_save_point(&mut self) { + self.wbs[self.index].set_save_point(); + self.save_points.push(self.index); + } + + fn pop_save_point(&mut self) -> Result<()> { + if let Some(x) = self.save_points.pop() { + return self.wbs[x].pop_save_point().map_err(r2e); + } + Err(r2e("no save point")) + } + + fn rollback_to_save_point(&mut self) -> Result<()> { + if let Some(x) = self.save_points.pop() { + for i in x + 1..=self.index { + self.wbs[i].clear(); + } + self.index = x; + return self.wbs[x].rollback_to_save_point().map_err(r2e); + } + Err(r2e("no save point")) + } + + fn merge(&mut self, other: Self) -> Result<()> { + match self.inner.as_mut() { + Either::Left(x) => x.merge(other.left().unwrap()), + Either::Right(x) => x.merge(other.right().unwrap()), + } + } +} + +impl Mutable for MixedWriteBatch { + fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { + if !self.do_write(engine_traits::CF_DEFAULT, key) { + return Ok(()); + } + match self.inner.as_mut() { + Either::Left(x) => x.put(key, value), + Either::Right(x) => x.put(key, value), + } + } + + fn put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { + if !self.do_write(cf, key) { + return Ok(()); + } + match self.inner.as_mut() { + Either::Left(x) => x.put_cf(cf, key, value), + Either::Right(x) => x.put_cf(cf, key, value), + } + } + + fn delete(&mut self, key: &[u8]) -> Result<()> { + if !self.do_write(engine_traits::CF_DEFAULT, key) { + return Ok(()); + } + match self.inner.as_mut() { + Either::Left(x) => x.delete(key), + Either::Right(x) => x.delete(key), + } + } + + fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> { + if !self.do_write(cf, key) { + return Ok(()); + } + match self.inner.as_mut() { + Either::Left(x) => x.delete_cf(cf, key), + Either::Right(x) => x.delete_cf(cf, key), + } + } + + fn delete_range(&mut self, begin_key: &[u8], end_key: &[u8]) -> Result<()> { + Ok(()) + } + + fn delete_range_cf(&mut self, cf: &str, begin_key: &[u8], end_key: &[u8]) -> Result<()> { + Ok(()) + } +} diff --git a/engine_tiflash/src/proxy_utils/engine_ext.rs b/engine_tiflash/src/proxy_utils/engine_ext.rs index 76ea6d28a57..d3dabdc042a 100644 --- a/engine_tiflash/src/proxy_utils/engine_ext.rs +++ b/engine_tiflash/src/proxy_utils/engine_ext.rs @@ -68,11 +68,11 @@ impl PageStorageExt { pub fn read_page(&self, page_id: &[u8]) -> Option> { // TODO maybe we can steal memory from C++ here to reduce redundant copy? let value = self.helper().read_page(page_id.into()); - return if value.view.len == 0 { + if value.view.len == 0 { None } else { Some(value.view.to_slice().to_vec()) - }; + } } #[allow(clippy::type_complexity)] diff --git a/engine_tiflash/src/proxy_utils/mod.rs b/engine_tiflash/src/proxy_utils/mod.rs index 929dd73153c..0252b47f8df 100644 --- a/engine_tiflash/src/proxy_utils/mod.rs +++ b/engine_tiflash/src/proxy_utils/mod.rs @@ -9,7 +9,9 @@ pub use engine_ext::*; mod proxy_ext; pub use proxy_ext::*; -use crate::util::get_cf_handle; +use crate::{ + mixed_engine::write_batch::MixedWriteBatch as RocksWriteBatchVec, util::get_cf_handle, +}; pub fn do_write(cf: &str, key: &[u8]) -> bool { fail::fail_point!("before_tiflash_do_write", |_| true); @@ -22,7 +24,7 @@ pub fn do_write(cf: &str, key: &[u8]) -> bool { } } -pub fn cf_to_name(batch: &crate::RocksWriteBatchVec, cf: u32) -> &'static str { +pub fn cf_to_name(batch: &RocksWriteBatchVec, cf: u32) -> &'static str { // d 0 w 2 l 1 let handle_default = get_cf_handle(batch.db.as_ref(), engine_traits::CF_DEFAULT).unwrap(); let d = handle_default.id(); @@ -42,7 +44,7 @@ pub fn cf_to_name(batch: &crate::RocksWriteBatchVec, cf: u32) -> &'static str { } #[cfg(any(test, feature = "testexport"))] -pub fn check_double_write(batch: &crate::RocksWriteBatchVec) { +pub fn check_double_write(batch: &RocksWriteBatchVec) { // It will fire if we write by both observer(compat_old_proxy is not enabled) // and TiKV's WriteBatch. fail::fail_point!("before_tiflash_check_double_write", |_| {}); @@ -61,9 +63,9 @@ pub fn check_double_write(batch: &crate::RocksWriteBatchVec) { } } #[cfg(not(any(test, feature = "testexport")))] -pub fn check_double_write(_: &crate::RocksWriteBatchVec) {} +pub fn check_double_write(_: &RocksWriteBatchVec) {} -pub fn log_check_double_write(batch: &crate::RocksWriteBatchVec) -> bool { +pub fn log_check_double_write(batch: &RocksWriteBatchVec) -> bool { check_double_write(batch); // TODO(tiflash) re-support this tracker. let mut e = true; diff --git a/engine_tiflash/src/ps_engine/engine.rs b/engine_tiflash/src/ps_engine/engine.rs index 82b732a66b4..20bddde4537 100644 --- a/engine_tiflash/src/ps_engine/engine.rs +++ b/engine_tiflash/src/ps_engine/engine.rs @@ -1,13 +1,18 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. +use std::sync::Arc; + use engine_rocks::RocksEngineIterator; use engine_traits::{IterOptions, Iterable, ReadOptions, Result}; +use super::{PSEngineWriteBatch, PSRocksWriteBatchVec}; use crate::{ mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, - PageStorageExt, + MixedWriteBatch, PageStorageExt, }; - +use crate::mixed_engine::write_batch::WRITE_BATCH_LIMIT; +use crate::mixed_engine::write_batch::WRITE_BATCH_MAX_BATCH; +use tikv_util::Either; #[derive(Clone, Debug)] pub struct PSElementEngine { pub ps_ext: PageStorageExt, @@ -90,6 +95,29 @@ impl ElementaryEngine for PSElementEngine { panic!("iterator_opt should not be called in PS engine"); r } + + fn write_batch(&self) -> MixedWriteBatch { + MixedWriteBatch { + inner: Either::Right(PSRocksWriteBatchVec::new( + Arc::clone(self.as_inner()), + self.ps_ext.clone(), + self.ps_ext.as_ref().unwrap().create_write_batch(), + WRITE_BATCH_LIMIT, + 1, + self.support_multi_batch_write(), + )), + } + } + + fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch { + MixedWriteBatch { + inner: Either::Right(PSRocksWriteBatchVec::with_unit_capacity( + self, + self.ps_ext.as_ref().unwrap().create_write_batch(), + cap, + )), + } + } } // Some data may be migrated from kv engine to raft engine in the future, diff --git a/engine_tiflash/src/ps_engine/mod.rs b/engine_tiflash/src/ps_engine/mod.rs index b55ef4ee727..3a053e84ce6 100644 --- a/engine_tiflash/src/ps_engine/mod.rs +++ b/engine_tiflash/src/ps_engine/mod.rs @@ -2,10 +2,8 @@ mod engine; mod ps_log_engine; -#[cfg(feature = "enable-pagestorage")] pub(crate) mod ps_write_batch; pub use engine::*; pub use ps_log_engine::*; -#[cfg(feature = "enable-pagestorage")] pub use ps_write_batch::*; diff --git a/engine_tiflash/src/ps_engine/ps_write_batch.rs b/engine_tiflash/src/ps_engine/ps_write_batch.rs index 98ec8dca9b8..eebf544a983 100644 --- a/engine_tiflash/src/ps_engine/ps_write_batch.rs +++ b/engine_tiflash/src/ps_engine/ps_write_batch.rs @@ -7,38 +7,16 @@ use engine_traits::{self, Mutable, Result, WriteBatchExt, WriteOptions}; use proxy_ffi::interfaces_ffi::RawCppPtr; use rocksdb::{WriteBatch as RawWriteBatch, DB}; -use crate::{engine::RocksEngine, ps_engine::add_prefix, r2e, PageStorageExt}; +use crate::{ + engine::RocksEngine, mixed_engine::elementary::ElementaryEngine, ps_engine::add_prefix, r2e, + PageStorageExt, +}; -const WRITE_BATCH_MAX_BATCH: usize = 16; -const WRITE_BATCH_LIMIT: usize = 16; - -impl WriteBatchExt for RocksEngine { - type WriteBatch = RocksWriteBatchVec; - - const WRITE_BATCH_MAX_KEYS: usize = 256; - - fn write_batch(&self) -> RocksWriteBatchVec { - RocksWriteBatchVec::new( - Arc::clone(self.as_inner()), - self.ps_ext.clone(), - self.ps_ext.as_ref().unwrap().create_write_batch(), - WRITE_BATCH_LIMIT, - 1, - self.support_multi_batch_write(), - ) - } - - fn write_batch_with_cap(&self, cap: usize) -> RocksWriteBatchVec { - RocksWriteBatchVec::with_unit_capacity( - self, - self.ps_ext.as_ref().unwrap().create_write_batch(), - cap, - ) - } -} +use crate::mixed_engine::write_batch::WRITE_BATCH_LIMIT; +use crate::mixed_engine::write_batch::WRITE_BATCH_MAX_BATCH; /// Used when impl WriteBatchExt. -/// `RocksWriteBatchVec` is for method `MultiBatchWrite` of RocksDB, which +/// `PSRocksWriteBatchVec` is for method `MultiBatchWrite` of RocksDB, which /// splits a large WriteBatch into many smaller ones and then any thread could /// help to deal with these small WriteBatch when it is calling /// `MultiBatchCommit` and wait the front writer to finish writing. @@ -46,7 +24,7 @@ impl WriteBatchExt for RocksEngine { /// `pipelined_write` when TiKV writes very large data into RocksDB. /// We will remove this feature when `unordered_write` of RocksDB becomes more /// stable and becomes compatible with Titan. -pub struct RocksWriteBatchVec { +pub struct PSRocksWriteBatchVec { pub db: Arc, pub wbs: Vec, pub ps_ext: Option, @@ -57,7 +35,7 @@ pub struct RocksWriteBatchVec { support_write_batch_vec: bool, } -impl Drop for RocksWriteBatchVec { +impl Drop for PSRocksWriteBatchVec { fn drop(&mut self) { if !self.ps_wb.ptr.is_null() { self.ps_ext @@ -69,7 +47,7 @@ impl Drop for RocksWriteBatchVec { } } -impl RocksWriteBatchVec { +impl PSRocksWriteBatchVec { pub fn new( db: Arc, ps_ext: Option, @@ -77,9 +55,9 @@ impl RocksWriteBatchVec { batch_size_limit: usize, cap: usize, support_write_batch_vec: bool, - ) -> RocksWriteBatchVec { + ) -> PSRocksWriteBatchVec { let wb = RawWriteBatch::with_capacity(cap); - RocksWriteBatchVec { + PSRocksWriteBatchVec { db, wbs: vec![wb], ps_ext, @@ -95,7 +73,7 @@ impl RocksWriteBatchVec { engine: &RocksEngine, ps_wb: RawCppPtr, cap: usize, - ) -> RocksWriteBatchVec { + ) -> PSRocksWriteBatchVec { Self::new( engine.as_inner().clone(), engine.ps_ext.clone(), @@ -131,7 +109,7 @@ impl RocksWriteBatchVec { } } -impl engine_traits::WriteBatch for RocksWriteBatchVec { +impl engine_traits::WriteBatch for PSRocksWriteBatchVec { fn write_opt(&mut self, opts: &WriteOptions) -> Result { // write into ps self.ps_ext @@ -209,17 +187,14 @@ impl engine_traits::WriteBatch for RocksWriteBatchVec { } } -impl RocksWriteBatchVec { +impl PSRocksWriteBatchVec { fn do_write(&self, cf: &str, key: &[u8]) -> bool { crate::do_write(cf, key) } } -impl Mutable for RocksWriteBatchVec { +impl Mutable for PSRocksWriteBatchVec { fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { - if !self.do_write(engine_traits::CF_DEFAULT, key) { - return Ok(()); - } self.ps_ext.as_ref().unwrap().write_batch_put_page( self.ps_wb.ptr, add_prefix(key).as_slice(), @@ -229,9 +204,6 @@ impl Mutable for RocksWriteBatchVec { } fn put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { - if !self.do_write(cf, key) { - return Ok(()); - } self.ps_ext.as_ref().unwrap().write_batch_put_page( self.ps_wb.ptr, add_prefix(key).as_slice(), @@ -241,9 +213,6 @@ impl Mutable for RocksWriteBatchVec { } fn delete(&mut self, key: &[u8]) -> Result<()> { - if !self.do_write(engine_traits::CF_DEFAULT, key) { - return Ok(()); - } self.ps_ext .as_ref() .unwrap() @@ -252,9 +221,6 @@ impl Mutable for RocksWriteBatchVec { } fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> { - if !self.do_write(cf, key) { - return Ok(()); - } self.ps_ext .as_ref() .unwrap() @@ -318,7 +284,7 @@ mod tests { assert!(v.is_some()); assert_eq!(v.unwrap(), b"bbb"); - let mut wb = RocksWriteBatchVec::with_unit_capacity(&engine, 1024); + let mut wb = PSRocksWriteBatchVec::with_unit_capacity(&engine, 1024); for _i in 0..RocksEngine::WRITE_BATCH_MAX_KEYS { wb.put(b"aaa", b"bbb").unwrap(); } @@ -358,7 +324,7 @@ mod tests { assert!(!wb.should_write_to_engine()); wb.put(b"aaa", b"bbb").unwrap(); assert!(wb.should_write_to_engine()); - let mut wb = RocksWriteBatchVec::with_unit_capacity(&engine, 1024); + let mut wb = PSRocksWriteBatchVec::with_unit_capacity(&engine, 1024); for _i in 0..WRITE_BATCH_MAX_BATCH * WRITE_BATCH_LIMIT { wb.put(b"aaa", b"bbb").unwrap(); } diff --git a/engine_tiflash/src/raft_engine.rs b/engine_tiflash/src/raft_engine.rs index f6adddc3782..5bfecd0cb08 100644 --- a/engine_tiflash/src/raft_engine.rs +++ b/engine_tiflash/src/raft_engine.rs @@ -16,7 +16,7 @@ use protobuf::Message; use raft::eraftpb::Entry; use tikv_util::{box_err, box_try}; -use crate::{util, RocksEngine, RocksWriteBatchVec}; +use crate::{mixed_engine::write_batch::MixedWriteBatch as RocksWriteBatchVec, util, RocksEngine}; impl RaftEngineReadOnly for RocksEngine { fn get_raft_state(&self, raft_group_id: u64) -> Result> { diff --git a/engine_tiflash/src/rocks_engine/engine.rs b/engine_tiflash/src/rocks_engine/engine.rs index 33a83637744..0b40f795646 100644 --- a/engine_tiflash/src/rocks_engine/engine.rs +++ b/engine_tiflash/src/rocks_engine/engine.rs @@ -1,11 +1,14 @@ // Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0. #![allow(unused_variables)] - +use std::sync::Arc; use engine_rocks::RocksEngineIterator; use engine_traits::{IterOptions, Iterable, Peekable, ReadOptions, Result}; use rocksdb::Writable; - +use tikv_util::Either; +use crate::MixedWriteBatch; +use crate::mixed_engine::write_batch::WRITE_BATCH_LIMIT; +use crate::mixed_engine::write_batch::WRITE_BATCH_MAX_BATCH; use crate::{ mixed_engine::{elementary::ElementaryEngine, MixedDbVector}, r2e, @@ -80,4 +83,21 @@ impl ElementaryEngine for RocksElementEngine { fn iterator_opt(&self, cf: &str, opts: IterOptions) -> Result { self.rocks.iterator_opt(cf, opts) } + + fn write_batch(&self) -> MixedWriteBatch { + MixedWriteBatch { + inner: Either::Left(super::RocksWriteBatchVec::new( + Arc::clone(self.as_inner()), + WRITE_BATCH_LIMIT, + 1, + self.support_multi_batch_write(), + )), + } + } + + fn write_batch_with_cap(&self, cap: usize) -> MixedWriteBatch { + MixedWriteBatch { + inner: Either::Left(super::RocksWriteBatchVec::with_unit_capacity(self, cap)), + } + } } diff --git a/engine_tiflash/src/rocks_engine/mod.rs b/engine_tiflash/src/rocks_engine/mod.rs index 3597401ae74..e1379ac8373 100644 --- a/engine_tiflash/src/rocks_engine/mod.rs +++ b/engine_tiflash/src/rocks_engine/mod.rs @@ -3,9 +3,7 @@ mod engine; pub use engine::*; -#[cfg(not(feature = "enable-pagestorage"))] pub mod write_batch; -#[cfg(not(feature = "enable-pagestorage"))] pub use write_batch::*; pub mod db_vector; diff --git a/engine_tiflash/src/rocks_engine/write_batch.rs b/engine_tiflash/src/rocks_engine/write_batch.rs index 2c6dd8e6bbb..f74e4afc698 100644 --- a/engine_tiflash/src/rocks_engine/write_batch.rs +++ b/engine_tiflash/src/rocks_engine/write_batch.rs @@ -7,29 +7,13 @@ use std::sync::Arc; use engine_traits::{self, Mutable, Result, WriteBatchExt, WriteOptions}; use rocksdb::{Writable, WriteBatch as RawWriteBatch, DB}; -use crate::{options::RocksWriteOptions, r2e, util::get_cf_handle, RocksEngine}; +use crate::{ + mixed_engine::elementary::ElementaryEngine, options::RocksWriteOptions, + ps_engine::PSElementEngine, r2e, util::get_cf_handle, RocksEngine, +}; -const WRITE_BATCH_MAX_BATCH: usize = 16; -const WRITE_BATCH_LIMIT: usize = 16; - -impl WriteBatchExt for RocksEngine { - type WriteBatch = RocksWriteBatchVec; - - const WRITE_BATCH_MAX_KEYS: usize = 256; - - fn write_batch(&self) -> RocksWriteBatchVec { - RocksWriteBatchVec::new( - Arc::clone(self.as_inner()), - WRITE_BATCH_LIMIT, - 1, - self.support_multi_batch_write(), - ) - } - - fn write_batch_with_cap(&self, cap: usize) -> RocksWriteBatchVec { - RocksWriteBatchVec::with_unit_capacity(self, cap) - } -} +use crate::mixed_engine::write_batch::WRITE_BATCH_LIMIT; +use crate::mixed_engine::write_batch::WRITE_BATCH_MAX_BATCH; /// `RocksWriteBatchVec` is for method `MultiBatchWrite` of RocksDB, which /// splits a large WriteBatch into many smaller ones and then any thread could @@ -205,34 +189,22 @@ impl RocksWriteBatchVec { impl Mutable for RocksWriteBatchVec { fn put(&mut self, key: &[u8], value: &[u8]) -> Result<()> { - if !self.do_write(engine_traits::CF_DEFAULT, key) { - return Ok(()); - } self.check_switch_batch(); self.wbs[self.index].put(key, value).map_err(r2e) } fn put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) -> Result<()> { - if !self.do_write(cf, key) { - return Ok(()); - } self.check_switch_batch(); let handle = get_cf_handle(self.db.as_ref(), cf)?; self.wbs[self.index].put_cf(handle, key, value).map_err(r2e) } fn delete(&mut self, key: &[u8]) -> Result<()> { - if !self.do_write(engine_traits::CF_DEFAULT, key) { - return Ok(()); - } self.check_switch_batch(); self.wbs[self.index].delete(key).map_err(r2e) } fn delete_cf(&mut self, cf: &str, key: &[u8]) -> Result<()> { - if !self.do_write(cf, key) { - return Ok(()); - } self.check_switch_batch(); let handle = get_cf_handle(self.db.as_ref(), cf)?; self.wbs[self.index].delete_cf(handle, key).map_err(r2e)