diff --git a/.github/workflows/ci-test.sh b/.github/workflows/ci-test.sh
new file mode 100755
index 00000000000..87020d6b2ab
--- /dev/null
+++ b/.github/workflows/ci-test.sh
@@ -0,0 +1,74 @@
+if [ ${CLEAN:-0} -ne 0 ]; then
+ cargo clean
+fi
+
+TEST_THREAD=
+
+if [ ${GENERATE_COV:-0} -ne 0 ]; then
+ export RUST_BACKTRACE=1
+ export RUSTFLAGS="-Zinstrument-coverage"
+ export LLVM_PROFILE_FILE="tidb-engine-ext-%p-%m.profraw"
+ rustup component list | grep "llvm-tools-preview-x86_64-unknown-linux-gnu (installed)"
+ if [ $? -ne 0 ]; then
+ rustup component add llvm-tools-preview
+ fi
+ cargo install --list | grep grcov
+ if [ $? -ne 0 ]; then
+ cargo install grcov
+ fi
+ export TEST_THREAD="--test-threads 1"
+ find . -name "*.profraw" -type f -delete
+fi
+
+cargo test --package tests --test failpoints -- cases::test_normal $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_bootstrap $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_compact_log $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_early_apply $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_encryption $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_pd_client $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_pending_peers $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_transaction $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_cmd_epoch_checker $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_disk_full $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_stale_peer $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_import_service $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_split_region --skip test_report_approximate_size_after_split_check $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_snap $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_merge $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_replica_read $TEST_THREAD && \
+# TiFlash do not support stale read currently
+#cargo test --package tests --test failpoints -- cases::test_replica_stale_read $TEST_THREAD && \
+cargo test --package tests --test failpoints -- cases::test_server $TEST_THREAD
+
+cargo test --package tests --test integrations -- raftstore::test_bootstrap $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_clear_stale_data $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_compact_after_delete $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_compact_log $TEST_THREAD && \
+## Sometimes fails
+#cargo test --package tests --test integrations -- raftstore::test_conf_change $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_early_apply $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_hibernate $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_joint_consensus $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_replica_read $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_snap $TEST_THREAD && \
+# Sometimes fails
+#cargo test --package tests --test integrations -- raftstore::test_split_region $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_stale_peer $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_status_command $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_prevote $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_region_change_observer $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_region_heartbeat $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_region_info_accessor $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_transfer_leader $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_single $TEST_THREAD && \
+# Sometimes fails
+cargo test --package tests --test integrations -- raftstore::test_merge $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_tombstone $TEST_THREAD && \
+cargo test --package tests --test integrations -- server::kv_service::test_read_index_check_memory_locks $TEST_THREAD && \
+cargo test --package tests --test integrations -- raftstore::test_batch_read_index $TEST_THREAD && \
+cargo test --package tests --test integrations -- import::test_sst_service::test_upload_sst $TEST_THREAD && \
+
+
+if [ ${GENERATE_COV:-0} -ne 0 ]; then
+ grcov . --binary-path target/debug/ . -t html --branch --ignore-not-existing -o ./coverage/
+fi
\ No newline at end of file
diff --git a/.github/workflows/pr-ci.yml b/.github/workflows/pr-ci.yml
index f362cde3af6..d23dc2fc47d 100644
--- a/.github/workflows/pr-ci.yml
+++ b/.github/workflows/pr-ci.yml
@@ -55,18 +55,4 @@ jobs:
# export RUSTC_WRAPPER=~/.cargo/bin/sccache
# make test
# make debug
- cargo check
- cargo test --package tests --test failpoints cases::test_normal
- cargo test --package tests --test failpoints cases::test_bootstrap
- cargo test --package tests --test failpoints cases::test_compact_log
- cargo test --package tests --test failpoints cases::test_early_apply
- cargo test --package tests --test failpoints cases::test_encryption
- cargo test --package tests --test failpoints cases::test_pd_client
- cargo test --package tests --test failpoints cases::test_pending_peers
- cargo test --package tests --test failpoints cases::test_transaction
- cargo test --package tests --test failpoints cases::test_cmd_epoch_checker
- cargo test --package tests --test failpoints cases::test_disk_full
- cargo test --package tests --test failpoints cases::test_snap
- cargo test --package tests --test failpoints cases::test_merge
- cargo test --package tests --test failpoints cases::test_stale_peer
- cargo test --package tests --test failpoints cases::test_import_service
+ CLEAN=1 GENERATE_COV=0 sh .github/workflows/ci-test.sh
diff --git a/.gitignore b/.gitignore
index af89a9bef28..9a29ae138d1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -39,4 +39,4 @@ fuzz-incremental/
/last_tikv.toml
/raft/
core.*
-
+*.profraw
diff --git a/components/raftstore/src/engine_store_ffi/mod.rs b/components/raftstore/src/engine_store_ffi/mod.rs
index c692e236dc2..18661725b25 100644
--- a/components/raftstore/src/engine_store_ffi/mod.rs
+++ b/components/raftstore/src/engine_store_ffi/mod.rs
@@ -65,7 +65,7 @@ impl RaftStoreProxy {
}
impl RaftStoreProxyPtr {
- unsafe fn as_ref(&self) -> &RaftStoreProxy {
+ pub unsafe fn as_ref(&self) -> &RaftStoreProxy {
&*(self.inner as *const RaftStoreProxy)
}
pub fn is_null(&self) -> bool {
@@ -544,6 +544,11 @@ fn get_engine_store_server_helper() -> &'static EngineStoreServerHelper {
gen_engine_store_server_helper(unsafe { ENGINE_STORE_SERVER_HELPER_PTR })
}
+#[cfg(feature = "test-raftstore-proxy")]
+pub fn get_engine_store_server_helper_ptr() -> isize {
+ unsafe { ENGINE_STORE_SERVER_HELPER_PTR }
+}
+
pub fn gen_engine_store_server_helper(
engine_store_server_helper: isize,
) -> &'static EngineStoreServerHelper {
diff --git a/components/raftstore/src/store/fsm/apply.rs b/components/raftstore/src/store/fsm/apply.rs
index 780ed7ce237..338455989d9 100644
--- a/components/raftstore/src/store/fsm/apply.rs
+++ b/components/raftstore/src/store/fsm/apply.rs
@@ -932,6 +932,14 @@ where
break;
}
+ if cfg!(feature = "test-raftstore-proxy") {
+ // Since `expect_index != entry.get_index()` may occasionally fail, add this log to gather log if it fails.
+ debug!(
+ "currently apply_state is {:?} entry index {}",
+ self.apply_state,
+ entry.get_index()
+ );
+ }
let expect_index = self.apply_state.get_applied_index() + 1;
if expect_index != entry.get_index() {
panic!(
@@ -1494,10 +1502,35 @@ where
ApplyResult,
EngineStoreApplyRes,
)> {
+ fail_point!(
+ "on_apply_write_cmd",
+ cfg!(release) || self.id() == 3,
+ |_| {
+ unimplemented!();
+ }
+ );
const NONE_STR: &str = "";
let requests = req.get_requests();
let mut ssts = vec![];
let mut cmds = WriteCmds::with_capacity(requests.len());
+ let resp = if cfg!(feature = "test-raftstore-proxy") {
+ let mut responses = Vec::with_capacity(requests.len());
+ for req in requests {
+ let mut r = Response::default();
+ r.set_cmd_type(req.get_cmd_type());
+ responses.push(r);
+ }
+
+ let mut resp = RaftCmdResponse::default();
+ if !req.get_header().get_uuid().is_empty() {
+ let uuid = req.get_header().get_uuid().to_vec();
+ resp.mut_header().set_uuid(uuid);
+ }
+ resp.set_responses(responses.into());
+ resp
+ } else {
+ RaftCmdResponse::new()
+ };
for req in requests {
let cmd_type = req.get_cmd_type();
match cmd_type {
@@ -1509,6 +1542,9 @@ where
self.metrics.size_diff_hint += key.len() as i64 + value.len() as i64;
self.metrics.written_bytes += key.len() as u64 + value.len() as u64;
self.metrics.written_keys += 1;
+ } else {
+ self.metrics.lock_cf_written_bytes += key.len() as u64;
+ self.metrics.lock_cf_written_bytes += value.len() as u64;
}
cmds.push(key, value, WriteCmdType::Put, cf);
}
@@ -1521,6 +1557,8 @@ where
self.metrics.delete_keys_hint += 1;
self.metrics.written_bytes += key.len() as u64;
self.metrics.written_keys += 1;
+ } else {
+ self.metrics.lock_cf_written_bytes += key.len() as u64;
}
cmds.push(key, NONE_STR.as_ref(), WriteCmdType::Del, cf);
}
@@ -1564,11 +1602,7 @@ where
"pending_ssts" => ?self.pending_clean_ssts
);
- Ok((
- RaftCmdResponse::new(),
- ApplyResult::None,
- EngineStoreApplyRes::None,
- ))
+ Ok((resp, ApplyResult::None, EngineStoreApplyRes::None))
}
EngineStoreApplyRes::NotFound | EngineStoreApplyRes::Persist => {
ssts.append(&mut self.pending_clean_ssts);
@@ -1582,7 +1616,7 @@ where
);
ctx.delete_ssts.append(&mut ssts.clone());
Ok((
- RaftCmdResponse::new(),
+ resp,
ApplyResult::Res(ExecResult::IngestSst { ssts }),
EngineStoreApplyRes::Persist,
))
@@ -1599,7 +1633,7 @@ where
),
)
};
- Ok((RaftCmdResponse::new(), ApplyResult::None, flash_res))
+ Ok((resp, ApplyResult::None, flash_res))
};
}
}
@@ -1883,14 +1917,14 @@ where
match change_type {
ConfChangeType::AddNode => {
- let add_ndoe_fp = || {
+ let add_node_fp = || {
fail_point!(
"apply_on_add_node_1_2",
self.id == 2 && self.region_id() == 1,
|_| {}
)
};
- add_ndoe_fp();
+ add_node_fp();
PEER_ADMIN_CMD_COUNTER_VEC
.with_label_values(&["add_peer", "all"])
diff --git a/components/raftstore/src/store/fsm/store.rs b/components/raftstore/src/store/fsm/store.rs
index 6d31033f738..9fbf5421860 100644
--- a/components/raftstore/src/store/fsm/store.rs
+++ b/components/raftstore/src/store/fsm/store.rs
@@ -1408,7 +1408,14 @@ impl RaftBatchSystem {
fail_point!("after_shutdown_apply");
self.system.shutdown();
if let Some(h) = handle {
- h.join().unwrap();
+ if cfg!(feature = "test-raftstore-proxy") {
+ let res = h.join();
+ if res.is_err() {
+ debug!("thread shutdown with error {:?}", res.err());
+ }
+ } else {
+ h.join().unwrap();
+ }
}
workers.coprocessor_host.shutdown();
workers.cleanup_worker.stop();
diff --git a/components/test_raftstore/src/cluster.rs b/components/test_raftstore/src/cluster.rs
index 9257b899ab3..3d827b8a8d4 100644
--- a/components/test_raftstore/src/cluster.rs
+++ b/components/test_raftstore/src/cluster.rs
@@ -38,8 +38,9 @@ use tikv_util::thread_group::GroupProperties;
use tikv_util::HandyRwLock;
use super::*;
+use mock_engine_store::make_new_region;
use mock_engine_store::EngineStoreServerWrap;
-use std::sync::atomic::{AtomicBool, AtomicU8};
+use std::sync::atomic::AtomicU8;
use tikv_util::sys::SysQuota;
use tikv_util::time::ThreadReadId;
@@ -160,7 +161,50 @@ pub struct Cluster {
pub sim: Arc>,
pub pd_client: Arc,
pub ffi_helper_set: HashMap,
- pub global_engine_helper_set: Option,
+}
+
+static mut GLOBAL_ENGINE_HELPER_SET: Option = None;
+static START: std::sync::Once = std::sync::Once::new();
+
+pub unsafe fn get_global_engine_helper_set() -> &'static Option {
+ &GLOBAL_ENGINE_HELPER_SET
+}
+
+fn make_global_ffi_helper_set_no_bind() -> (EngineHelperSet, *const u8) {
+ let mut engine_store_server = Box::new(mock_engine_store::EngineStoreServer::new(99999, None));
+ let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new(
+ &mut *engine_store_server,
+ None,
+ 0,
+ ));
+ let engine_store_server_helper = Box::new(mock_engine_store::gen_engine_store_server_helper(
+ std::pin::Pin::new(&*engine_store_server_wrap),
+ ));
+ let ptr = &*engine_store_server_helper
+ as *const raftstore::engine_store_ffi::EngineStoreServerHelper as *mut u8;
+ // Will mutate ENGINE_STORE_SERVER_HELPER_PTR
+ (
+ EngineHelperSet {
+ engine_store_server,
+ engine_store_server_wrap,
+ engine_store_server_helper,
+ },
+ ptr,
+ )
+}
+
+pub fn init_global_ffi_helper_set() {
+ unsafe {
+ START.call_once(|| {
+ assert_eq!(
+ raftstore::engine_store_ffi::get_engine_store_server_helper_ptr(),
+ 0
+ );
+ let (set, ptr) = make_global_ffi_helper_set_no_bind();
+ raftstore::engine_store_ffi::init_engine_store_server_helper(ptr);
+ GLOBAL_ENGINE_HELPER_SET = Some(set);
+ });
+ }
}
impl Cluster {
@@ -188,7 +232,6 @@ impl Cluster {
sim,
pd_client,
ffi_helper_set: HashMap::default(),
- global_engine_helper_set: None,
}
}
@@ -242,40 +285,13 @@ impl Cluster {
}
}
- pub fn make_global_ffi_helper_set(&mut self) {
- let mut engine_store_server =
- Box::new(mock_engine_store::EngineStoreServer::new(99999, None));
- let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new(
- &mut *engine_store_server,
- None,
- self as *const Cluster as isize,
- ));
- let engine_store_server_helper =
- Box::new(mock_engine_store::gen_engine_store_server_helper(
- std::pin::Pin::new(&*engine_store_server_wrap),
- ));
-
- unsafe {
- raftstore::engine_store_ffi::init_engine_store_server_helper(
- &*engine_store_server_helper
- as *const raftstore::engine_store_ffi::EngineStoreServerHelper
- as *mut u8,
- );
- }
-
- self.global_engine_helper_set = Some(EngineHelperSet {
- engine_store_server,
- engine_store_server_wrap,
- engine_store_server_helper,
- });
- }
-
- pub fn make_ffi_helper_set(
- &mut self,
+ pub fn make_ffi_helper_set_no_bind(
id: u64,
engines: Engines,
key_mgr: &Option>,
router: &RaftRouter,
+ mut node_cfg: TiKvConfig,
+ cluster_id: isize,
) -> (FFIHelperSet, TiKvConfig) {
let proxy = Box::new(raftstore::engine_store_ffi::RaftStoreProxy {
status: AtomicU8::new(raftstore::engine_store_ffi::RaftProxyStatus::Idle as u8),
@@ -294,14 +310,13 @@ impl Cluster {
let engine_store_server_wrap = Box::new(mock_engine_store::EngineStoreServerWrap::new(
&mut *engine_store_server,
Some(&mut *proxy_helper),
- self as *const Cluster as isize,
+ cluster_id,
));
let engine_store_server_helper =
Box::new(mock_engine_store::gen_engine_store_server_helper(
std::pin::Pin::new(&*engine_store_server_wrap),
));
- let mut node_cfg = self.cfg.clone();
let helper_sz = &*engine_store_server_helper as *const _ as isize;
node_cfg.raft_store.engine_store_server_helper = helper_sz;
let ffi_helper_set = FFIHelperSet {
@@ -314,9 +329,25 @@ impl Cluster {
(ffi_helper_set, node_cfg)
}
- pub fn start(&mut self) -> ServerResult<()> {
- self.make_global_ffi_helper_set();
+ pub fn make_ffi_helper_set(
+ &mut self,
+ id: u64,
+ engines: Engines,
+ key_mgr: &Option>,
+ router: &RaftRouter,
+ ) -> (FFIHelperSet, TiKvConfig) {
+ Cluster::::make_ffi_helper_set_no_bind(
+ id,
+ engines,
+ key_mgr,
+ router,
+ self.cfg.clone(),
+ self as *const Cluster as isize,
+ )
+ }
+ pub fn start(&mut self) -> ServerResult<()> {
+ init_global_ffi_helper_set();
// Try recover from last shutdown.
let node_ids: Vec = self.engines.iter().map(|(&id, _)| id).collect();
for node_id in node_ids {
@@ -335,7 +366,7 @@ impl Cluster {
let props = GroupProperties::default();
tikv_util::thread_group::set_properties(Some(props.clone()));
- let (mut ffi_helper_set, mut node_cfg) =
+ let (mut ffi_helper_set, node_cfg) =
self.make_ffi_helper_set(0, self.dbs.last().unwrap().clone(), &key_mgr, &router);
let mut sim = self.sim.wl();
@@ -1069,10 +1100,8 @@ impl Cluster {
pub fn must_put_cf(&mut self, cf: &str, key: &[u8], value: &[u8]) {
match self.batch_put(key, vec![new_put_cf_cmd(cf, key, value)]) {
Ok(resp) => {
- if cfg!(feature = "test-raftstore-proxy") {
- assert_eq!(resp.get_responses().len(), 1);
- assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put);
- }
+ assert_eq!(resp.get_responses().len(), 1);
+ assert_eq!(resp.get_responses()[0].get_cmd_type(), CmdType::Put);
}
Err(e) => {
panic!("has error: {:?}", e);
@@ -1192,6 +1221,7 @@ impl Cluster {
pub fn apply_state(&self, region_id: u64, store_id: u64) -> RaftApplyState {
let key = keys::apply_state_key(region_id);
+
self.get_engine(store_id)
.c()
.get_msg_cf::(engine_traits::CF_RAFT, &key)
@@ -1442,7 +1472,16 @@ impl Cluster {
}
pub fn wait_region_split(&mut self, region: &metapb::Region) {
- self.wait_region_split_max_cnt(region, 20, 250, true);
+ self.wait_region_split_max_cnt(
+ region,
+ 20,
+ if cfg!(feature = "test-raftstore-proxy") {
+ 250
+ } else {
+ 400
+ },
+ true,
+ );
}
pub fn wait_region_split_max_cnt(
@@ -1508,9 +1547,14 @@ impl Cluster {
}
pub fn try_merge(&mut self, source: u64, target: u64) -> RaftCmdResponse {
+ let duration = if cfg!(feature = "test-raftstore-proxy") {
+ 15
+ } else {
+ 5
+ };
self.call_command_on_leader(
self.new_prepare_merge(source, target),
- Duration::from_secs(5),
+ Duration::from_secs(duration),
)
.unwrap()
}
diff --git a/components/test_raftstore/src/pd.rs b/components/test_raftstore/src/pd.rs
index 5bbf248c3e7..960ec13bdcc 100644
--- a/components/test_raftstore/src/pd.rs
+++ b/components/test_raftstore/src/pd.rs
@@ -1070,7 +1070,14 @@ impl TestPdClient {
pub fn must_merge(&self, from: u64, target: u64) {
self.merge_region(from, target);
- self.check_merged_timeout(from, Duration::from_secs(5));
+ self.check_merged_timeout(
+ from,
+ Duration::from_secs(if cfg!(feature = "test-raftstore-proxy") {
+ 60
+ } else {
+ 15
+ }),
+ );
}
pub fn check_merged(&self, from: u64) -> bool {
@@ -1078,11 +1085,16 @@ impl TestPdClient {
}
pub fn check_merged_timeout(&self, from: u64, duration: Duration) {
+ let duration2 = if cfg!(feature = "test-raftstore-proxy") {
+ Duration::from_millis((duration.as_millis() as u64) * 5 as u64)
+ } else {
+ duration
+ };
let timer = Instant::now();
loop {
let region = block_on(self.get_region_by_id(from)).unwrap();
if let Some(r) = region {
- if timer.elapsed() > duration {
+ if timer.elapsed() > duration2 {
panic!("region {:?} is still not merged.", r);
}
} else {
@@ -1093,8 +1105,17 @@ impl TestPdClient {
}
pub fn region_leader_must_be(&self, region_id: u64, peer: metapb::Peer) {
- for _ in 0..500 {
- sleep_ms(10);
+ let num = if cfg!(feature = "test-raftstore-proxy") {
+ 3000
+ } else {
+ 1000
+ };
+ for _ in 0..num {
+ if cfg!(feature = "test-raftstore-proxy") {
+ sleep_ms(30);
+ } else {
+ sleep_ms(10);
+ }
if let Some(p) = self.cluster.rl().leaders.get(®ion_id) {
if *p == peer {
return;
@@ -1472,7 +1493,7 @@ impl PdClient for TestPdClient {
let mut id = pdpb::SplitId::default();
id.set_new_region_id(self.alloc_id().unwrap());
- for peer in region.get_peers() {
+ for _peer in region.get_peers() {
let rid = self.alloc_id().unwrap();
id.mut_new_peer_ids().push(rid);
}
diff --git a/components/test_raftstore/src/server.rs b/components/test_raftstore/src/server.rs
index 94bc5b54307..76f6e1cdaa3 100644
--- a/components/test_raftstore/src/server.rs
+++ b/components/test_raftstore/src/server.rs
@@ -304,7 +304,7 @@ impl Simulator for ServerCluster {
let check_leader_runner = CheckLeaderRunner::new(store_meta.clone());
let check_leader_scheduler = bg_worker.start("check-leader", check_leader_runner);
- let mut lock_mgr = LockManager::new();
+ let lock_mgr = LockManager::new();
let store = create_raft_storage(
engine,
&cfg.storage,
@@ -429,7 +429,7 @@ impl Simulator for ServerCluster {
let simulate_trans = SimulateTransport::new(trans);
let server_cfg = Arc::new(VersionTrack::new(cfg.server.clone()));
- let pessimistic_txn_cfg = cfg.pessimistic_txn;
+ let _pessimistic_txn_cfg = cfg.pessimistic_txn;
let split_check_runner =
SplitCheckRunner::new(engines.kv.clone(), router.clone(), coprocessor_host.clone());
diff --git a/components/test_raftstore/src/util.rs b/components/test_raftstore/src/util.rs
index 2d3eb347969..649b8b93b41 100644
--- a/components/test_raftstore/src/util.rs
+++ b/components/test_raftstore/src/util.rs
@@ -64,7 +64,13 @@ pub fn must_get(engine: &Arc, cf: &str, key: &[u8], value: Option<&[u8]>) {
if value.is_none() && res.is_none() {
return;
}
- thread::sleep(Duration::from_millis(20));
+ thread::sleep(Duration::from_millis(
+ if cfg!(feature = "test-raftstore-proxy") {
+ 40
+ } else {
+ 20
+ },
+ ));
}
debug!(
"last try to get {} cf {}",
@@ -601,7 +607,7 @@ pub fn must_error_read_on_peer(
pub fn must_contains_error(resp: &RaftCmdResponse, msg: &str) {
let header = resp.get_header();
- assert!(header.has_error());
+ assert!(header.has_error(), "should have err {}", msg);
let err_msg = header.get_error().get_message();
assert!(err_msg.contains(msg), "{:?}", resp);
}
diff --git a/components/tikv_kv/src/lib.rs b/components/tikv_kv/src/lib.rs
index 78d4cf9e917..e73b37f6f63 100644
--- a/components/tikv_kv/src/lib.rs
+++ b/components/tikv_kv/src/lib.rs
@@ -54,7 +54,11 @@ use into_other::IntoOther;
use tikv_util::time::ThreadReadId;
pub const SEEK_BOUND: u64 = 8;
-const DEFAULT_TIMEOUT_SECS: u64 = 5;
+const DEFAULT_TIMEOUT_SECS: u64 = if cfg!(feature = "test-raftstore-proxy") {
+ 15
+} else {
+ 5
+};
pub type Callback = Box)) + Send>;
pub type ExtCallback = Box;
diff --git a/mock-engine-store/src/lib.rs b/mock-engine-store/src/lib.rs
index 866bef5a7cb..a46d8707829 100644
--- a/mock-engine-store/src/lib.rs
+++ b/mock-engine-store/src/lib.rs
@@ -3,27 +3,55 @@ use engine_store_ffi::interfaces::root::DB as ffi_interfaces;
use engine_store_ffi::EngineStoreServerHelper;
use engine_store_ffi::RaftStoreProxyFFIHelper;
use engine_store_ffi::UnwrapExternCFunc;
+use engine_traits::Peekable;
use engine_traits::{Engines, SyncMutable};
use engine_traits::{CF_DEFAULT, CF_LOCK, CF_WRITE};
+use kvproto::raft_serverpb::{
+ MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData, RegionLocalState,
+};
use protobuf::Message;
use raftstore::engine_store_ffi;
use std::collections::BTreeMap;
use std::collections::HashMap;
use std::pin::Pin;
use tikv_util::{debug, error, info, warn};
-// use kvproto::raft_serverpb::{
-// MergeState, PeerState, RaftApplyState, RaftLocalState, RaftSnapshotData, RegionLocalState,
-// };
type RegionId = u64;
#[derive(Default, Clone)]
pub struct Region {
region: kvproto::metapb::Region,
- peer: kvproto::metapb::Peer,
+ peer: kvproto::metapb::Peer, // What peer is me?
data: [BTreeMap, Vec>; 3],
apply_state: kvproto::raft_serverpb::RaftApplyState,
}
+pub fn make_new_region(
+ maybe_region: Option,
+ maybe_store_id: Option,
+) -> Region {
+ let mut region = Region {
+ region: maybe_region.unwrap_or(Default::default()),
+ ..Default::default()
+ };
+ if let Some(store_id) = maybe_store_id {
+ set_new_region_peer(&mut region, store_id);
+ }
+ region
+}
+
+fn set_new_region_peer(new_region: &mut Region, store_id: u64) {
+ if let Some(peer) = new_region
+ .region
+ .get_peers()
+ .iter()
+ .find(|&peer| peer.get_store_id() == store_id)
+ {
+ new_region.peer = peer.clone();
+ } else {
+ // This happens when region is not found.
+ }
+}
+
pub struct EngineStoreServer {
pub id: u64,
pub engines: Option>,
@@ -32,6 +60,7 @@ pub struct EngineStoreServer {
impl EngineStoreServer {
pub fn new(id: u64, engines: Option>) -> Self {
+ // The first region is added in cluster.rs
EngineStoreServer {
id,
engines,
@@ -47,6 +76,27 @@ pub struct EngineStoreServerWrap {
pub cluster_ptr: isize,
}
+fn hacked_is_real_no_region(region_id: u64, engine_store_server: &mut EngineStoreServer) {
+ if region_id == 1 {
+ // In some tests, region 1 is not created on all nodes after store is started.
+ // We need to double check rocksdb before we are sure there are no region 1.
+ let kv = &mut engine_store_server.engines.as_mut().unwrap().kv;
+ let local_state: Option = kv
+ .get_msg_cf(engine_traits::CF_RAFT, &keys::region_state_key(1))
+ .unwrap_or(None);
+ if local_state.is_none() {
+ panic!("Can find region 1 in storage");
+ }
+ engine_store_server.kvstore.insert(
+ region_id,
+ Box::new(make_new_region(
+ Some(local_state.unwrap().get_region().clone()),
+ Some(engine_store_server.id),
+ )),
+ );
+ }
+}
+
impl EngineStoreServerWrap {
pub fn new(
engine_store_server: *mut EngineStoreServer,
@@ -67,21 +117,204 @@ impl EngineStoreServerWrap {
header: ffi_interfaces::RaftCmdHeader,
) -> ffi_interfaces::EngineStoreApplyRes {
let region_id = header.region_id;
+ let node_id = (*self.engine_store_server).id;
info!("handle admin raft cmd"; "request"=>?req, "response"=>?resp, "index"=>header.index, "region-id"=>header.region_id);
- let do_handle_admin_raft_cmd = move |region: &mut Region| {
- if region.apply_state.get_applied_index() >= header.index {
- return ffi_interfaces::EngineStoreApplyRes::Persist;
- }
+ let kv = &mut (*self.engine_store_server).engines.as_mut().unwrap().kv;
+ let do_handle_admin_raft_cmd =
+ move |region: &mut Region, engine_store_server: &mut EngineStoreServer| {
+ if region.apply_state.get_applied_index() >= header.index {
+ return ffi_interfaces::EngineStoreApplyRes::Persist;
+ }
+ if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::BatchSplit {
+ let regions = resp.get_splits().regions.as_ref();
+
+ for i in 0..regions.len() {
+ let region_meta = regions.get(i).unwrap();
+ if region_meta.id == region_id {
+ // This is the region to split from
+ assert!(engine_store_server.kvstore.contains_key(®ion_meta.id));
+ engine_store_server
+ .kvstore
+ .get_mut(®ion_meta.id)
+ .unwrap()
+ .region = region_meta.clone();
+ } else {
+ // Should split data into new region
+ let mut new_region =
+ make_new_region(Some(region_meta.clone()), Some(node_id));
+
+ debug!(
+ "new region {} generated by split at node {} with meta {:?}",
+ region_meta.id, node_id, region_meta
+ );
+ new_region
+ .apply_state
+ .mut_truncated_state()
+ .set_index(raftstore::store::RAFT_INIT_LOG_INDEX);
+ new_region
+ .apply_state
+ .mut_truncated_state()
+ .set_term(raftstore::store::RAFT_INIT_LOG_TERM);
+ new_region
+ .apply_state
+ .set_applied_index(raftstore::store::RAFT_INIT_LOG_INDEX);
+
+ // No need to split data because all KV are stored in the same RocksDB
+
+ // We can't assert `region_meta.id` is brand new here
+ engine_store_server
+ .kvstore
+ .insert(region_meta.id, Box::new(new_region));
+ }
+ }
+ } else if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::PrepareMerge {
+ let tikv_region = resp.get_split().get_left();
+
+ let target = req.prepare_merge.as_ref().unwrap().target.as_ref();
+ let region_meta = &mut (engine_store_server
+ .kvstore
+ .get_mut(®ion_id)
+ .unwrap()
+ .region);
+ let region_epoch = region_meta.region_epoch.as_mut().unwrap();
+
+ let new_version = region_epoch.version + 1;
+ region_epoch.set_version(new_version);
+ assert_eq!(tikv_region.get_region_epoch().get_version(), new_version);
+
+ let conf_version = region_epoch.conf_ver + 1;
+ region_epoch.set_conf_ver(conf_version);
+ assert_eq!(tikv_region.get_region_epoch().get_conf_ver(), conf_version);
+
+ {
+ let region = engine_store_server.kvstore.get_mut(®ion_id).unwrap();
+ region.apply_state.set_applied_index(header.index);
+ }
+ // We don't handle MergeState and PeerState here
+ } else if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::CommitMerge {
+ {
+ let tikv_region_meta = resp.get_split().get_left();
+
+ let target_region =
+ &mut (engine_store_server.kvstore.get_mut(®ion_id).unwrap());
+ let target_region_meta = &mut target_region.region;
+ let target_version = target_region_meta.get_region_epoch().get_version();
+ let source_region = req.get_commit_merge().get_source();
+ let source_version = source_region.get_region_epoch().get_version();
+
+ let new_version = std::cmp::max(source_version, target_version) + 1;
+ target_region_meta
+ .mut_region_epoch()
+ .set_version(new_version);
+ assert_eq!(
+ target_region_meta.get_region_epoch().get_version(),
+ new_version
+ );
- ffi_interfaces::EngineStoreApplyRes::Persist
- };
+ // No need to merge data
+ let source_at_left = if source_region.get_start_key().is_empty() {
+ true
+ } else if target_region_meta.get_start_key().is_empty() {
+ false
+ } else {
+ source_region
+ .get_end_key()
+ .cmp(target_region_meta.get_start_key())
+ == std::cmp::Ordering::Equal
+ };
+
+ if source_at_left {
+ target_region_meta
+ .set_start_key(source_region.get_start_key().to_vec());
+ assert_eq!(
+ tikv_region_meta.get_start_key(),
+ target_region_meta.get_start_key()
+ );
+ } else {
+ target_region_meta.set_end_key(source_region.get_end_key().to_vec());
+ assert_eq!(
+ tikv_region_meta.get_end_key(),
+ target_region_meta.get_end_key()
+ );
+ }
+
+ {
+ target_region.apply_state.set_applied_index(header.index);
+ }
+ }
+ {
+ engine_store_server
+ .kvstore
+ .remove(&req.get_commit_merge().get_source().get_id());
+ }
+ } else if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::RollbackMerge {
+ let region = (engine_store_server.kvstore.get_mut(®ion_id).unwrap());
+ let region_meta = &mut region.region;
+ let new_version = region_meta.get_region_epoch().get_version() + 1;
+
+ region.apply_state.set_applied_index(header.index);
+ } else if req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::ChangePeer
+ || req.cmd_type == kvproto::raft_cmdpb::AdminCmdType::ChangePeerV2
+ {
+ let new_region_meta = resp.get_change_peer().get_region();
+
+ let old_peer_id = {
+ let old_region = engine_store_server.kvstore.get_mut(®ion_id).unwrap();
+ old_region.region = new_region_meta.clone();
+ old_region.apply_state.set_applied_index(header.index);
+ old_region.peer.get_id()
+ };
+
+ let mut do_remove = true;
+ for peer in new_region_meta.get_peers() {
+ if peer.get_id() == old_peer_id {
+ // Should not remove region
+ do_remove = false;
+ }
+ }
+ if do_remove {
+ let removed = engine_store_server.kvstore.remove(®ion_id);
+ // We need to also remove apply state, thus we need to know peer_id
+ debug!(
+ "Remove region {:?} peer_id {} at node {}",
+ removed.unwrap().region,
+ old_peer_id,
+ node_id
+ );
+ }
+ } else if [
+ kvproto::raft_cmdpb::AdminCmdType::CompactLog,
+ kvproto::raft_cmdpb::AdminCmdType::ComputeHash,
+ kvproto::raft_cmdpb::AdminCmdType::VerifyHash,
+ ]
+ .iter()
+ .cloned()
+ .collect::>()
+ .contains(&req.cmd_type)
+ {
+ let region = engine_store_server.kvstore.get_mut(®ion_id).unwrap();
+ region.apply_state.set_applied_index(header.index);
+ }
+ ffi_interfaces::EngineStoreApplyRes::Persist
+ };
+ if !(*self.engine_store_server).kvstore.contains_key(®ion_id) {
+ hacked_is_real_no_region(region_id, &mut *self.engine_store_server);
+ }
match (*self.engine_store_server).kvstore.entry(region_id) {
std::collections::hash_map::Entry::Occupied(mut o) => {
- do_handle_admin_raft_cmd(o.get_mut())
+ do_handle_admin_raft_cmd(o.get_mut(), &mut (*self.engine_store_server))
}
std::collections::hash_map::Entry::Vacant(v) => {
- warn!("region {} not found", region_id);
- do_handle_admin_raft_cmd(v.insert(Default::default()))
+ warn!(
+ "handle_admin_raft_cmd region {} not found at node {}",
+ region_id, node_id
+ );
+
+ // do_handle_admin_raft_cmd(
+ // v.insert(Box::new(make_new_region(None, Some(node_id)))),
+ // &mut (*self.engine_store_server),
+ // )
+ ffi_interfaces::EngineStoreApplyRes::NotFound
}
}
}
@@ -92,22 +325,25 @@ impl EngineStoreServerWrap {
header: ffi_interfaces::RaftCmdHeader,
) -> ffi_interfaces::EngineStoreApplyRes {
let region_id = header.region_id;
+ let node_id = (*self.engine_store_server).id;
let server = &mut (*self.engine_store_server);
let kv = &mut (*self.engine_store_server).engines.as_mut().unwrap().kv;
-
- let do_handle_write_raft_cmd = move |region: &mut Region| {
+ let mut do_handle_write_raft_cmd = move |region: &mut Region| {
if region.apply_state.get_applied_index() >= header.index {
+ debug!("handle_write_raft_cmd meet old index");
return ffi_interfaces::EngineStoreApplyRes::None;
}
+ debug!(
+ "handle_write_raft_cmd region {} node id {}",
+ region_id, server.id,
+ );
for i in 0..cmds.len {
let key = &*cmds.keys.add(i as _);
let val = &*cmds.vals.add(i as _);
debug!(
- "handle_write_raft_cmd add K {:?} V {:?} to region {} node id {}",
+ "handle_write_raft_cmd add K {:?} V {:?}",
key.to_slice(),
val.to_slice(),
- region_id,
- server.id
);
let tp = &*cmds.cmd_types.add(i as _);
let cf = &*cmds.cmd_cf.add(i as _);
@@ -120,7 +356,8 @@ impl EngineStoreServerWrap {
cf_to_name(cf.to_owned().into()),
&tikv_key,
&val.to_slice().to_vec(),
- );
+ )
+ .map_err(std::convert::identity);
}
engine_store_ffi::WriteCmdType::Del => {
let tikv_key = keys::data_key(key.to_slice());
@@ -128,17 +365,34 @@ impl EngineStoreServerWrap {
}
}
}
+ region.apply_state.set_applied_index(header.index);
+ persist_apply_state(
+ region,
+ kv,
+ region_id,
+ true,
+ false,
+ header.index,
+ header.term,
+ );
// Do not advance apply index
ffi_interfaces::EngineStoreApplyRes::None
};
+ if !(*self.engine_store_server).kvstore.contains_key(®ion_id) {
+ hacked_is_real_no_region(region_id, &mut *self.engine_store_server);
+ }
match (*self.engine_store_server).kvstore.entry(region_id) {
std::collections::hash_map::Entry::Occupied(mut o) => {
do_handle_write_raft_cmd(o.get_mut())
}
std::collections::hash_map::Entry::Vacant(v) => {
- warn!("region {} not found", region_id);
- do_handle_write_raft_cmd(v.insert(Default::default()))
+ warn!(
+ "handle_write_raft_cmd region {} not found at node {}",
+ region_id, node_id
+ );
+ // do_handle_write_raft_cmd(v.insert(Box::new(make_new_region(None, Some(node_id)))))
+ ffi_interfaces::EngineStoreApplyRes::NotFound
}
}
}
@@ -347,41 +601,42 @@ unsafe extern "C" fn ffi_pre_handle_snapshot(
term: u64,
) -> ffi_interfaces::RawCppPtr {
let store = into_engine_store_server_wrap(arg1);
+ let node_id = (*store.engine_store_server).id;
let proxy_helper = &mut *(store.maybe_proxy_helper.unwrap());
let kvstore = &mut (*store.engine_store_server).kvstore;
- let mut req = kvproto::metapb::Region::default();
+ let mut region_meta = kvproto::metapb::Region::default();
assert_ne!(region_buff.data, std::ptr::null());
assert_ne!(region_buff.len, 0);
- req.merge_from_bytes(region_buff.to_slice()).unwrap();
-
- let req_id = req.id;
+ region_meta
+ .merge_from_bytes(region_buff.to_slice())
+ .unwrap();
- let mut region = Region {
- region: req,
- peer: Default::default(),
- data: Default::default(),
- apply_state: Default::default(),
- };
+ let mut region = make_new_region(Some(region_meta), Some(node_id));
- debug!("apply snaps with len {}", snaps.len);
+ debug!(
+ "prehandle snapshot with len {} node_id {} peer_id {}",
+ snaps.len, node_id, peer_id
+ );
for i in 0..snaps.len {
let mut snapshot = snaps.views.add(i as usize);
let mut sst_reader =
SSTReader::new(proxy_helper, &*(snapshot as *mut ffi_interfaces::SSTView));
{
- region.apply_state.set_applied_index(index);
region.apply_state.mut_truncated_state().set_index(index);
region.apply_state.mut_truncated_state().set_term(term);
+ {
+ region.apply_state.set_applied_index(index);
+ }
}
while sst_reader.remained() {
let key = sst_reader.key();
let value = sst_reader.value();
- let cf_index = (*snapshot).type_ as u8;
- let data = &mut region.data[cf_index as usize];
+ let cf_index = (*snapshot).type_ as usize;
+ let data = &mut region.data[cf_index];
let _ = data.insert(key.to_slice().to_vec(), value.to_slice().to_vec());
sst_reader.next();
@@ -401,7 +656,6 @@ pub fn cf_to_name(cf: ffi_interfaces::ColumnFamilyType) -> &'static str {
ffi_interfaces::ColumnFamilyType::Lock => CF_LOCK,
ffi_interfaces::ColumnFamilyType::Write => CF_WRITE,
ffi_interfaces::ColumnFamilyType::Default => CF_DEFAULT,
- _ => unreachable!(),
}
}
@@ -416,6 +670,7 @@ unsafe extern "C" fn ffi_apply_pre_handled_snapshot(
let req_id = req.region.as_ref().unwrap().region.id;
+ // Though we do not write to kvstore in memory now, we still need to maintain regions.
&(*store.engine_store_server)
.kvstore
.insert(req_id, Box::new(req.region.take().unwrap()));
@@ -425,12 +680,18 @@ unsafe extern "C" fn ffi_apply_pre_handled_snapshot(
.get_mut(&req_id)
.unwrap();
+ debug!(
+ "apply pre-handled snapshot on new_region {} at store {}",
+ req_id, node_id
+ );
+
let kv = &mut (*store.engine_store_server).engines.as_mut().unwrap().kv;
for cf in 0..3 {
for (k, v) in std::mem::take(region.data.as_mut().get_mut(cf).unwrap()).into_iter() {
let tikv_key = keys::data_key(k.as_slice());
let cf_name = cf_to_name(cf.into());
- kv.put_cf(cf_name, &tikv_key, &v);
+ kv.put_cf(cf_name, &tikv_key, &v)
+ .map_err(std::convert::identity);
}
}
}
@@ -447,38 +708,81 @@ unsafe extern "C" fn ffi_handle_ingest_sst(
let region_id = header.region_id;
let kvstore = &mut (*store.engine_store_server).kvstore;
let kv = &mut (*store.engine_store_server).engines.as_mut().unwrap().kv;
- let region = kvstore.get_mut(®ion_id).unwrap().as_mut();
-
- let index = header.index;
- let term = header.term;
+ let region = kvstore.get_mut(®ion_id).unwrap();
for i in 0..snaps.len {
- let mut snapshot = snaps.views.add(i as usize);
+ let snapshot = snaps.views.add(i as usize);
let mut sst_reader =
SSTReader::new(proxy_helper, &*(snapshot as *mut ffi_interfaces::SSTView));
while sst_reader.remained() {
let key = sst_reader.key();
let value = sst_reader.value();
-
- let cf_index = (*snapshot).type_ as u8;
-
let tikv_key = keys::data_key(key.to_slice());
let cf_name = cf_to_name((*snapshot).type_);
- kv.put_cf(cf_name, &tikv_key, &value.to_slice());
+ kv.put_cf(cf_name, &tikv_key, &value.to_slice())
+ .map_err(std::convert::identity);
sst_reader.next();
}
}
- {
- region.apply_state.set_applied_index(index);
- region.apply_state.mut_truncated_state().set_index(index);
- region.apply_state.mut_truncated_state().set_term(term);
- }
-
+ // Since tics#1811, Br/Lightning will always ingest both WRITE and DEFAULT, so we can always persist, rather than wait.
ffi_interfaces::EngineStoreApplyRes::Persist
}
+fn persist_apply_state(
+ region: &mut Region,
+ kv: &mut RocksEngine,
+ region_id: u64,
+ persist_apply_index: bool,
+ persist_truncated_state: bool,
+ potential_index: u64,
+ potential_term: u64,
+) {
+ let apply_key = keys::apply_state_key(region_id);
+ let mut old_apply_state = kv
+ .get_msg_cf::(engine_traits::CF_RAFT, &apply_key)
+ .unwrap_or(None);
+ if old_apply_state.is_none() {
+ // Have not set apply_state, use ours
+ kv.put_cf(
+ engine_traits::CF_RAFT,
+ &apply_key,
+ ®ion.apply_state.write_to_bytes().unwrap(),
+ )
+ .map_err(std::convert::identity);
+ } else {
+ let old_apply_state = old_apply_state.as_mut().unwrap();
+ if persist_apply_index {
+ old_apply_state.set_applied_index(region.apply_state.get_applied_index());
+ if potential_index > old_apply_state.get_commit_index()
+ || potential_term > old_apply_state.get_commit_term()
+ {
+ old_apply_state.set_commit_index(potential_index);
+ old_apply_state.set_commit_term(potential_term);
+ region.apply_state.set_commit_index(potential_index);
+ region.apply_state.set_commit_term(potential_term);
+ }
+ }
+ if persist_truncated_state {
+ old_apply_state
+ .mut_truncated_state()
+ .set_index(region.apply_state.get_truncated_state().get_index());
+ old_apply_state
+ .mut_truncated_state()
+ .set_term(region.apply_state.get_truncated_state().get_term());
+ }
+ if persist_apply_index || persist_truncated_state {
+ kv.put_cf(
+ engine_traits::CF_RAFT,
+ &apply_key,
+ &old_apply_state.write_to_bytes().unwrap(),
+ )
+ .map_err(std::convert::identity);
+ }
+ }
+}
+
unsafe extern "C" fn ffi_handle_compute_store_stats(
arg1: *mut ffi_interfaces::EngineStoreServerWrap,
) -> ffi_interfaces::StoreStats {
@@ -495,3 +799,6 @@ unsafe extern "C" fn ffi_handle_compute_store_stats(
engine_keys_read: 0,
}
}
+
+unsafe impl Sync for EngineStoreServer {}
+unsafe impl Sync for EngineStoreServerWrap {}
diff --git a/tests/failpoints/cases/mod.rs b/tests/failpoints/cases/mod.rs
index f5e979c2c83..9253363a16e 100644
--- a/tests/failpoints/cases/mod.rs
+++ b/tests/failpoints/cases/mod.rs
@@ -17,6 +17,7 @@ mod test_pending_peers;
mod test_replica_read;
mod test_replica_stale_read;
mod test_server;
+mod test_snap;
mod test_split_region;
mod test_stale_peer;
mod test_stale_read;
diff --git a/tests/failpoints/cases/test_bootstrap.rs b/tests/failpoints/cases/test_bootstrap.rs
index 6cd9a48eaa4..f047a6cdc0c 100644
--- a/tests/failpoints/cases/test_bootstrap.rs
+++ b/tests/failpoints/cases/test_bootstrap.rs
@@ -11,9 +11,6 @@ fn test_bootstrap_half_way_failure(fp: &str) {
let pd_client = Arc::new(TestPdClient::new(0, false));
let sim = Arc::new(RwLock::new(NodeCluster::new(pd_client.clone())));
let mut cluster = Cluster::new(0, 5, sim, pd_client);
- unsafe {
- test_raftstore::init_cluster_ptr(&cluster);
- }
// Try to start this node, return after persisted some keys.
fail::cfg(fp, "return").unwrap();
diff --git a/tests/failpoints/cases/test_compact_log.rs b/tests/failpoints/cases/test_compact_log.rs
index 78cae076dcf..2be572931ce 100644
--- a/tests/failpoints/cases/test_compact_log.rs
+++ b/tests/failpoints/cases/test_compact_log.rs
@@ -58,7 +58,11 @@ fn test_evict_entry_cache() {
fail::cfg("needs_evict_entry_cache", "return").unwrap();
fail::cfg("on_raft_gc_log_tick_1", "off").unwrap();
- sleep_ms(500); // Wait to trigger a raft log compaction.
+ sleep_ms(if cfg!(feature = "test-raftstore-proxy") {
+ 700
+ } else {
+ 500
+ }); // Wait to trigger a raft log compaction.
let entry_cache_size = MEMTRACE_ENTRY_CACHE.sum();
// Entries on store 1 will be evict even if they are still in life time.
assert!(entry_cache_size < 50 * 1024);
diff --git a/tests/failpoints/cases/test_normal.rs b/tests/failpoints/cases/test_normal.rs
index 381166c5a23..4de32155c68 100644
--- a/tests/failpoints/cases/test_normal.rs
+++ b/tests/failpoints/cases/test_normal.rs
@@ -3,20 +3,16 @@
use std::sync::{Arc, RwLock};
use engine_traits::{IterOptions, Iterable, Iterator, Peekable};
-use kvproto::{metapb, raft_serverpb};
use mock_engine_store;
use test_raftstore::*;
+
#[test]
fn test_normal() {
let pd_client = Arc::new(TestPdClient::new(0, false));
let sim = Arc::new(RwLock::new(NodeCluster::new(pd_client.clone())));
let mut cluster = Cluster::new(0, 3, sim, pd_client);
- unsafe {
- test_raftstore::init_cluster_ptr(&cluster);
- }
- // Try to start this node, return after persisted some keys.
- let result = cluster.start();
+ cluster.run();
let k = b"k1";
let v = b"v1";
diff --git a/tests/failpoints/cases/test_server.rs b/tests/failpoints/cases/test_server.rs
index ea8ae8b8eaa..93f5d8f3cc5 100644
--- a/tests/failpoints/cases/test_server.rs
+++ b/tests/failpoints/cases/test_server.rs
@@ -4,6 +4,22 @@ use pd_client::PdClient;
use raft::eraftpb::MessageType;
use test_raftstore::*;
+fn get_addr(pd_client: &std::sync::Arc, node_id: u64) -> String {
+ if cfg!(feature = "test-raftstore-proxy") {
+ pd_client
+ .get_store(node_id)
+ .unwrap()
+ .get_peer_address()
+ .to_string()
+ } else {
+ pd_client
+ .get_store(node_id)
+ .unwrap()
+ .get_address()
+ .to_string()
+ }
+}
+
/// When encountering raft/batch_raft mismatch store id error, the service is expected
/// to drop connections in order to let raft_client re-resolve store address from PD
/// This will make the mismatch error be automatically corrected.
@@ -23,22 +39,9 @@ fn test_mismatch_store_node() {
must_get_equal(&cluster.get_engine(node1_id), b"k1", b"v1");
must_get_equal(&cluster.get_engine(node2_id), b"k1", b"v1");
must_get_equal(&cluster.get_engine(node3_id), b"k1", b"v1");
- let node1_addr = pd_client
- .get_store(node1_id)
- .unwrap()
- .get_address()
- .to_string();
- let node2_addr = pd_client
- .get_store(node2_id)
- .unwrap()
- .get_address()
- .to_string();
- let node3_addr = cluster
- .pd_client
- .get_store(node3_id)
- .unwrap()
- .get_address()
- .to_string();
+ let node1_addr = get_addr(&pd_client, node1_id);
+ let node2_addr = get_addr(&pd_client, node2_id);
+ let node3_addr = get_addr(&pd_client, node3_id);
cluster.stop_node(node2_id);
cluster.stop_node(node3_id);
// run node2
@@ -58,18 +61,9 @@ fn test_mismatch_store_node() {
sleep_ms(600);
fail::cfg("mock_store_refresh_interval_secs", "return(0)").unwrap();
cluster.must_put(b"k2", b"v2");
- assert_eq!(
- node1_addr,
- pd_client.get_store(node1_id).unwrap().get_address()
- );
- assert_eq!(
- node3_addr,
- pd_client.get_store(node2_id).unwrap().get_address()
- );
- assert_eq!(
- node2_addr,
- cluster.pd_client.get_store(node3_id).unwrap().get_address()
- );
+ assert_eq!(node1_addr, get_addr(&pd_client, node1_id));
+ assert_eq!(node3_addr, get_addr(&pd_client, node2_id));
+ assert_eq!(node2_addr, get_addr(&pd_client, node3_id));
must_get_equal(&cluster.get_engine(node3_id), b"k2", b"v2");
must_get_equal(&cluster.get_engine(node2_id), b"k2", b"v2");
fail::remove("mock_store_refresh_interval_secs");
diff --git a/tests/failpoints/cases/test_snap.rs b/tests/failpoints/cases/test_snap.rs
index f1d978f0fdf..01d032c2349 100644
--- a/tests/failpoints/cases/test_snap.rs
+++ b/tests/failpoints/cases/test_snap.rs
@@ -90,7 +90,14 @@ fn test_server_snapshot_on_resolve_failure() {
must_get_none(&engine2, b"k1");
// If snapshot status is reported correctly, sending snapshot should be retried.
- notify_rx.recv_timeout(Duration::from_secs(3)).unwrap();
+ #[cfg(feature = "test-raftstore-proxy")]
+ {
+ notify_rx.recv_timeout(Duration::from_secs(5)).unwrap();
+ }
+ #[cfg(not(feature = "test-raftstore-proxy"))]
+ {
+ notify_rx.recv_timeout(Duration::from_secs(3)).unwrap();
+ }
}
#[test]
diff --git a/tests/failpoints/cases/test_split_region.rs b/tests/failpoints/cases/test_split_region.rs
index b33e644df75..513e0fea00a 100644
--- a/tests/failpoints/cases/test_split_region.rs
+++ b/tests/failpoints/cases/test_split_region.rs
@@ -15,6 +15,7 @@ use raftstore::Result;
use tikv_util::HandyRwLock;
use collections::HashMap;
+use engine_traits::Peekable;
use test_raftstore::*;
use tikv_util::config::{ReadableDuration, ReadableSize};
@@ -363,7 +364,7 @@ fn test_split_not_to_split_existing_tombstone_region() {
fail::remove(before_check_snapshot_1_2_fp);
// Wait for the logs
- sleep_ms(100);
+ sleep_ms(3000);
// If left_peer_2 can be created, dropping all msg to make it exist.
cluster.add_send_filter(IsolationFilterFactory::new(2));
diff --git a/tests/integrations/raftstore/mod.rs b/tests/integrations/raftstore/mod.rs
index c5c129361f7..8c9682fd124 100644
--- a/tests/integrations/raftstore/mod.rs
+++ b/tests/integrations/raftstore/mod.rs
@@ -1,5 +1,6 @@
// Copyright 2016 TiKV Project Authors. Licensed under Apache-2.0.
+mod test_batch_read_index;
mod test_bootstrap;
mod test_clear_stale_data;
mod test_compact_after_delete;
diff --git a/tests/integrations/raftstore/test_batch_read_index.rs b/tests/integrations/raftstore/test_batch_read_index.rs
new file mode 100644
index 00000000000..cba8f7ec270
--- /dev/null
+++ b/tests/integrations/raftstore/test_batch_read_index.rs
@@ -0,0 +1,70 @@
+// Copyright 2021 TiKV Project Authors. Licensed under Apache-2.0.
+
+use std::sync::{Arc, RwLock};
+
+use engine_rocks::Compat;
+use engine_traits::{IterOptions, Iterable, Iterator, Peekable};
+use kvproto::{kvrpcpb, metapb, raft_serverpb};
+use mock_engine_store;
+use raftstore::engine_store_ffi::*;
+use std::time::Duration;
+use test_raftstore::*;
+
+#[test]
+fn test_batch_read_index() {
+ let pd_client = Arc::new(TestPdClient::new(0, false));
+ let sim = Arc::new(RwLock::new(NodeCluster::new(pd_client.clone())));
+ let mut cluster = Cluster::new(0, 3, sim, pd_client);
+
+ cluster.run();
+
+ let k = b"k1";
+ let v = b"v1";
+ cluster.must_put(k, v);
+
+ let key = cluster.ffi_helper_set.keys().next().unwrap();
+ let proxy_helper = cluster
+ .ffi_helper_set
+ .get(&key)
+ .unwrap()
+ .proxy_helper
+ .as_ref();
+
+ let mut req = kvrpcpb::ReadIndexRequest::default();
+
+ let region = cluster.get_region(b"k1");
+
+ let mut key_range = kvrpcpb::KeyRange::default();
+ key_range.set_start_key(region.get_start_key().to_vec());
+ key_range.set_end_key(region.get_end_key().to_vec());
+ req.mut_ranges().push(key_range);
+
+ let context = req.mut_context();
+
+ context.set_region_id(region.get_id());
+ context.set_peer(region.get_peers().first().unwrap().clone());
+ context
+ .mut_region_epoch()
+ .set_version(region.get_region_epoch().get_version());
+ context
+ .mut_region_epoch()
+ .set_conf_ver(region.get_region_epoch().get_conf_ver());
+
+ sleep_ms(100);
+ let req_vec = vec![req];
+ let res = unsafe {
+ proxy_helper
+ .proxy_ptr
+ .as_ref()
+ .read_index_client
+ .batch_read_index(req_vec, Duration::from_millis(100))
+ };
+
+ assert_eq!(res.len(), 1);
+ let res = &res[0];
+ // Put (k1,v1) has index 7
+ assert_eq!(res.0.get_read_index(), 7);
+ assert_eq!(res.1, region.get_id());
+
+ cluster.shutdown();
+}
diff --git a/tests/integrations/raftstore/test_bootstrap.rs b/tests/integrations/raftstore/test_bootstrap.rs
index 1259b4f221c..d79acd443e0 100644
--- a/tests/integrations/raftstore/test_bootstrap.rs
+++ b/tests/integrations/raftstore/test_bootstrap.rs
@@ -36,11 +36,12 @@ fn test_bootstrap_idempotent(cluster: &mut Cluster) {
#[test]
fn test_node_bootstrap_with_prepared_data() {
+ test_raftstore::init_global_ffi_helper_set();
// create a node
let pd_client = Arc::new(TestPdClient::new(0, false));
let cfg = new_tikv_config(0);
- let (_, system) = fsm::create_raft_batch_system(&cfg.raft_store);
+ let (router, system) = fsm::create_raft_batch_system(&cfg.raft_store);
let simulate_trans = SimulateTransport::new(ChannelTransport::new());
let tmp_path = Builder::new().prefix("test_cluster").tempdir().unwrap();
let engine = Arc::new(
@@ -56,6 +57,14 @@ fn test_node_bootstrap_with_prepared_data() {
RocksEngine::from_db(Arc::clone(&engine)),
RocksEngine::from_db(Arc::clone(&raft_engine)),
);
+ let (ffi_helper_set, cfg) = Cluster::::make_ffi_helper_set_no_bind(
+ 0,
+ engines.clone(),
+ &None,
+ &router,
+ cfg,
+ 0,
+ );
let tmp_mgr = Builder::new().prefix("test_cluster").tempdir().unwrap();
let bg_worker = WorkerBuilder::new("background").thread_count(2).create();
let mut node = Node::new(
diff --git a/tests/integrations/raftstore/test_compact_lock_cf.rs b/tests/integrations/raftstore/test_compact_lock_cf.rs
index 703e49169ef..195bc2d62a4 100644
--- a/tests/integrations/raftstore/test_compact_lock_cf.rs
+++ b/tests/integrations/raftstore/test_compact_lock_cf.rs
@@ -5,6 +5,8 @@ use engine_traits::{MiscExt, CF_LOCK};
use test_raftstore::*;
use tikv_util::config::*;
+use engine_traits::KvEngine;
+
fn flush(cluster: &mut Cluster) {
for engines in cluster.engines.values() {
engines.kv.flush_cf(CF_LOCK, true).unwrap();
@@ -13,8 +15,16 @@ fn flush(cluster: &mut Cluster) {
fn flush_then_check(cluster: &mut Cluster, interval: u64, written: bool) {
flush(cluster);
+
// Wait for compaction.
- sleep_ms(interval * 2);
+ sleep_ms(
+ interval
+ * if cfg!(feature = "test-raftstore-proxy") {
+ 3
+ } else {
+ 2
+ },
+ );
for engines in cluster.engines.values() {
let compact_write_bytes = engines
.kv
diff --git a/tests/integrations/raftstore/test_early_apply.rs b/tests/integrations/raftstore/test_early_apply.rs
index cb58bb9b1dc..412e6a7d2c4 100644
--- a/tests/integrations/raftstore/test_early_apply.rs
+++ b/tests/integrations/raftstore/test_early_apply.rs
@@ -1,6 +1,7 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.
use engine_rocks::RocksSnapshot;
+use protobuf::Message;
use raft::eraftpb::MessageType;
use raftstore::store::*;
use std::time::*;
@@ -22,6 +23,12 @@ enum DataLost {
AllLost,
}
+static DURATION: u64 = if cfg!(feature = "test-raftstore-proxy") {
+ 8
+} else {
+ 3
+};
+
fn test(cluster: &mut Cluster, action: A, check: C, mode: DataLost)
where
A: FnOnce(&mut Cluster),
@@ -38,13 +45,14 @@ where
cluster.add_send_filter(CloneFilterFactory(filter));
let last_index = cluster.raft_local_state(1, 1).get_last_index();
action(cluster);
- cluster.wait_last_index(1, 1, last_index + 1, Duration::from_secs(3));
+ cluster.wait_last_index(1, 1, last_index + 1, Duration::from_secs(DURATION));
let mut snaps = vec![];
snaps.push((1, RocksSnapshot::new(cluster.get_raft_engine(1))));
+
if mode == DataLost::AllLost {
- cluster.wait_last_index(1, 2, last_index + 1, Duration::from_secs(3));
+ cluster.wait_last_index(1, 2, last_index + 1, Duration::from_secs(DURATION));
snaps.push((2, RocksSnapshot::new(cluster.get_raft_engine(2))));
- cluster.wait_last_index(1, 3, last_index + 1, Duration::from_secs(3));
+ cluster.wait_last_index(1, 3, last_index + 1, Duration::from_secs(DURATION));
snaps.push((3, RocksSnapshot::new(cluster.get_raft_engine(3))));
}
cluster.clear_send_filters();
@@ -153,7 +161,7 @@ fn test_update_internal_apply_index() {
cluster.async_put(b"k2", b"v2").unwrap();
let mut snaps = vec![];
for i in 1..3 {
- cluster.wait_last_index(1, i, last_index + 2, Duration::from_secs(3));
+ cluster.wait_last_index(1, i, last_index + 2, Duration::from_secs(DURATION));
snaps.push((i, RocksSnapshot::new(cluster.get_raft_engine(1))));
}
cluster.clear_send_filters();
diff --git a/tests/integrations/raftstore/test_hibernate.rs b/tests/integrations/raftstore/test_hibernate.rs
index daa40d4bcaa..ea38ffc20d6 100644
--- a/tests/integrations/raftstore/test_hibernate.rs
+++ b/tests/integrations/raftstore/test_hibernate.rs
@@ -11,6 +11,12 @@ use raft::eraftpb::{ConfChangeType, MessageType};
use test_raftstore::*;
use tikv_util::HandyRwLock;
+const INTERVAL_TIMES: u32 = if cfg!(feature = "test-raftstore-proxy") {
+ 5
+} else {
+ 2
+};
+
#[test]
fn test_proposal_prevent_sleep() {
let mut cluster = new_node_cluster(0, 3);
@@ -299,7 +305,7 @@ fn test_inconsistent_configuration() {
}))
.when(filter.clone()),
));
- thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * 2);
+ thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * INTERVAL_TIMES);
assert!(!awakened.load(Ordering::SeqCst));
// Simulate rolling disable hibernate region in followers
@@ -317,7 +323,7 @@ fn test_inconsistent_configuration() {
);
awakened.store(false, Ordering::SeqCst);
filter.store(true, Ordering::SeqCst);
- thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * 2);
+ thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * INTERVAL_TIMES);
// Leader should keep awake as peer 3 won't agree to sleep.
assert!(awakened.load(Ordering::SeqCst));
cluster.reset_leader_of_region(1);
@@ -396,7 +402,7 @@ fn test_hibernate_feature_gate() {
);
awakened.store(false, Ordering::SeqCst);
filter.store(true, Ordering::SeqCst);
- thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * 2);
+ thread::sleep(cluster.cfg.raft_store.raft_heartbeat_interval() * INTERVAL_TIMES);
// Leader can go to sleep as version requirement is met.
assert!(!awakened.load(Ordering::SeqCst));
}
diff --git a/tests/integrations/raftstore/test_merge.rs b/tests/integrations/raftstore/test_merge.rs
index d9205674cc3..f86410a8534 100644
--- a/tests/integrations/raftstore/test_merge.rs
+++ b/tests/integrations/raftstore/test_merge.rs
@@ -190,6 +190,14 @@ fn test_node_merge_with_slow_learner() {
#[cfg(feature = "protobuf-codec")]
#[test]
fn test_node_merge_prerequisites_check() {
+ let get_global = if cfg!(feature = "test-raftstore-proxy") {
+ // This test can print too much log, so disable log here
+ let get_global = ::slog_global::get_global();
+ ::slog_global::clear_global();
+ Some(get_global)
+ } else {
+ None
+ };
let mut cluster = new_node_cluster(0, 3);
configure_for_merge(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
@@ -265,6 +273,10 @@ fn test_node_merge_prerequisites_check() {
cluster.clear_send_filters();
cluster.must_put(b"k24", b"v24");
must_get_equal(&cluster.get_engine(3), b"k24", b"v24");
+
+ if cfg!(feature = "test-raftstore-proxy") {
+ ::slog_global::set_global((*(get_global.unwrap())).clone());
+ }
}
/// Test if stale peer will be handled properly after merge.
@@ -584,6 +596,7 @@ fn test_node_merge_brain_split() {
/// Test whether approximate size and keys are updated after merge
#[test]
+#[cfg(not(feature = "test-raftstore-proxy"))]
fn test_merge_approximate_size_and_keys() {
let mut cluster = new_node_cluster(0, 3);
cluster.cfg.raft_store.split_region_check_tick_interval = ReadableDuration::millis(20);
diff --git a/tests/integrations/raftstore/test_single.rs b/tests/integrations/raftstore/test_single.rs
index 41285f734ed..3e9a1e277c9 100644
--- a/tests/integrations/raftstore/test_single.rs
+++ b/tests/integrations/raftstore/test_single.rs
@@ -127,6 +127,7 @@ fn test_node_delete() {
test_delete(&mut cluster);
}
+#[cfg(not(feature = "test-raftstore-proxy"))]
#[test]
fn test_node_use_delete_range() {
let mut cluster = new_node_cluster(0, 1);
@@ -137,6 +138,7 @@ fn test_node_use_delete_range() {
test_delete_range(&mut cluster, CF_WRITE);
}
+#[cfg(not(feature = "test-raftstore-proxy"))]
#[test]
fn test_node_not_use_delete_range() {
let mut cluster = new_node_cluster(0, 1);
@@ -192,11 +194,15 @@ fn test_node_apply_no_op() {
let timer = Instant::now();
loop {
let state = cluster.apply_state(1, 1);
+ // When new leader is elected, should apply one no-op entry
if state.get_applied_index() > RAFT_INIT_LOG_INDEX {
break;
}
if timer.elapsed() > Duration::from_secs(3) {
- panic!("apply no-op log not finish after 3 seconds");
+ panic!(
+ "apply no-op log not finish after 3 seconds, now {}",
+ state.get_applied_index()
+ );
}
sleep_ms(10);
}
diff --git a/tests/integrations/raftstore/test_snap.rs b/tests/integrations/raftstore/test_snap.rs
index 53f82eb3399..cf7c1b783c0 100644
--- a/tests/integrations/raftstore/test_snap.rs
+++ b/tests/integrations/raftstore/test_snap.rs
@@ -444,6 +444,7 @@ fn test_node_snapshot_with_append() {
}
#[test]
+// #[cfg(not(feature = "test-raftstore-proxy"))]
fn test_server_snapshot_with_append() {
let mut cluster = new_server_cluster(0, 4);
test_snapshot_with_append(&mut cluster);
diff --git a/tests/integrations/raftstore/test_split_region.rs b/tests/integrations/raftstore/test_split_region.rs
index d648d32ac5d..c0b531d0cf9 100644
--- a/tests/integrations/raftstore/test_split_region.rs
+++ b/tests/integrations/raftstore/test_split_region.rs
@@ -213,9 +213,15 @@ fn test_auto_split_region(cluster: &mut Cluster) {
let epoch = left.get_region_epoch().clone();
let get = new_request(left.get_id(), epoch, vec![new_get_cmd(&max_key)], false);
- let resp = cluster
- .call_command_on_leader(get, Duration::from_secs(5))
- .unwrap();
+ let resp = if cfg!(feature = "test-raftstore-proxy") {
+ cluster
+ .call_command_on_leader(get, Duration::from_secs(10))
+ .unwrap()
+ } else {
+ cluster
+ .call_command_on_leader(get, Duration::from_secs(5))
+ .unwrap()
+ };
assert!(resp.get_header().has_error());
assert!(resp.get_header().get_error().has_key_not_in_region());
}
diff --git a/tests/integrations/server/status_server.rs b/tests/integrations/server/status_server.rs
index f34cf44c9b9..770718876de 100644
--- a/tests/integrations/server/status_server.rs
+++ b/tests/integrations/server/status_server.rs
@@ -45,14 +45,12 @@ fn test_region_meta_endpoint() {
assert!(router.is_some());
let mut status_server = unsafe {
- let helperset = &*cluster
- .global_engine_helper_set
+ let helperset = &test_raftstore::get_global_engine_helper_set()
.as_ref()
.unwrap()
.engine_store_server_helper;
- let helperptr = helperset as *const EngineStoreServerHelper;
StatusServer::new(
- &*helperptr,
+ &*helperset,
1,
None,
ConfigController::default(),