Skip to content

Support on_apply_snapshot_committed to delete useless FAP snapshot #428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions proxy_components/engine_store_ffi/src/core/fast_add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
}
}

// Only use after phase 1 is finished.
// Only use after phase 1 is finished and the FAP snapshot is not useable,
// otherwise, just call `fallback_to_slow_path` and let `post_apply_snapshot` to
// decide.
pub fn fap_fallback_to_slow(&self, region_id: u64) {
self.engine_store_server_helper
.clear_fap_snapshot(region_id);
.clear_fap_snapshot(region_id, 0); // 0 for fallback
let mut wb = self.raft_engine.log_batch(2);
let raft_state = kvproto::raft_serverpb::RaftLocalState::default();
let _ = self.raft_engine.clean(region_id, 0, &raft_state, &mut wb);
Expand All @@ -58,7 +60,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
) {
if clean_fap_snapshot {
self.engine_store_server_helper
.clear_fap_snapshot(region_id);
.clear_fap_snapshot(region_id, 0); // 0 for fallback
}
let mut wb = self.raft_engine.log_batch(2);
let raft_state = kvproto::raft_serverpb::RaftLocalState::default();
Expand Down Expand Up @@ -475,7 +477,9 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
// We call fallback here even if the fap is persisted and sent.
// Because the sent snapshot is only to be handled if (idnex, term) matches,
// even if there is another normal snapshot. Because both snapshots are
// idendical. TODO However, we can retry FAP for
// idendical.
// IMPORTANT!! Do not clear the fap snapshot here, they can be still useful.
// TODO However, we can retry FAP for
// several times before we fail. However,
// the cases here is rare. We have only observed several raft logs missing
// problem.
Expand All @@ -486,6 +490,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
};
}
Err(e) => {
// TODO we have noticed that in cse, the snapshot could not be sent.
info!(
"fast path: ongoing {}:{} {} failed. build and sent snapshot error {:?}",
self.store_id, region_id, new_peer_id, e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,21 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
region_id,
|info: MapEntry<u64, Arc<CachedRegionInfo>>| match info {
MapEntry::Occupied(_) => {
if !self.engine_store_server_helper.kvstore_region_exist(region_id) {
if self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id, snap_key.idx, snap_key.term) == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted {
info!("fast path: prehandle first snapshot skipped {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);
should_skip = true;
}
let already_existed = self
.engine_store_server_helper
.kvstore_region_exist(region_id);
if self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id, snap_key.idx, snap_key.term) == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted {
info!("fast path: prehandle first snapshot skipped {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"already_existed" => already_existed,
);
should_skip = true;
}
}
MapEntry::Vacant(_) => {
// It won't go here because cached region info is inited after restart and on the first fap message.
// It won't go here because cached region info is inited after restart and on the first fap message mostly.
// However, it could happens when a snapshot is replayed in `apply_snap`.
let pstate = self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id, snap_key.idx, snap_key.term);
if pstate == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted {
// We have a fap snapshot now. skip
Expand Down Expand Up @@ -82,16 +85,12 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
let already_existed = self
.engine_store_server_helper
.kvstore_region_exist(region_id);
if already_existed {
debug!("fast path: skip apply snapshot because not first {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
);
return false;
}

info!("fast path: start applying first fap snapshot {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"already_existed" => already_existed,
"init" => self.is_initialized(region_id)
);
// Even if the feature is not enabled, the snapshot could still be a previously
// generated fap snapshot. So we have to also handle this snapshot,
Expand Down Expand Up @@ -123,6 +122,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
"current_enabled" => current_enabled,
"tag" => tag
);
fail::fail_point!("fap_core_must_shared_snapshot", |_| { return false });
if expected_snapshot_type == SnapshotDeducedType::Fap {
// It won't actually happen because TiFlash will panic since `assert_exist` is
// true in this case.
Expand Down Expand Up @@ -164,6 +164,8 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
) {
return quit_apply_fap("apply");
}

fail::fail_point!("fap_core_must_not_shared_snapshot", |_| { return false });
// If it's a reguar snapshot have the same (index, term) as the fap snapshot,
// it make no difference which snapshot we actually applied.
// So we always choose to apply a fap snapshot, since it saves as from
Expand Down Expand Up @@ -206,10 +208,13 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
applied_fap = try_apply_fap_snapshot(o.get().clone());
}
MapEntry::Vacant(_) => {
// It won't go here because cached region info is inited after restart and on the first fap message.
// It won't go here because cached region info is inited after restart and on the first fap message mostly.
// However, it could happens when a snapshot is replayed in `apply_snap`.
let inited = self.is_initialized(region_id);
info!("fast path: check should apply fap snapshot noexist {}:{} {}", self.store_id, region_id, peer_id;
"snap_key" => ?snap_key,
"region_id" => region_id,
"inited" => inited,
);
assert!(self.is_initialized(region_id));
let o = Arc::new(CachedRegionInfo::default());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,44 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
true
}

pub fn on_apply_snapshot_committed(
&self,
region: &Region,
peer_id: u64,
snap_key: &store::SnapKey,
_: Option<&store::Snapshot>,
) {
fail::fail_point!("on_ob_apply_snapshot_committed", |_| {
return;
});
#[allow(unused_mut)]
let mut should_check_fap_snapshot = self.packed_envs.engine_store_cfg.enable_unips;
#[allow(clippy::redundant_closure_call)]
(|| {
fail::fail_point!("on_apply_snapshot_committed_allow_no_unips", |_| {
// UniPS can't provide a snapshot currently
should_check_fap_snapshot = true;
});
})();
#[allow(clippy::collapsible_if)]
if should_check_fap_snapshot {
if self.engine_store_server_helper.query_fap_snapshot_state(
region.get_id(),
peer_id,
snap_key.idx,
snap_key.term,
) == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted
{
info!("remove fap snapshot on success";
"peer_id" => peer_id,
"region_id" => region.get_id(),
);
self.engine_store_server_helper
.clear_fap_snapshot(region.get_id(), 1); // 1 for success
}
}
}

pub fn cancel_apply_snapshot(&self, region_id: u64, peer_id: u64) {
info!("start cancel apply snapshot";
"peer_id" => peer_id,
Expand Down
12 changes: 12 additions & 0 deletions proxy_components/engine_store_ffi/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,18 @@ impl<T: Transport + 'static, ER: RaftEngine> ApplySnapshotObserver for TiFlashOb
forwarder.cancel_apply_snapshot(region_id, peer_id)
}
}

fn on_apply_snapshot_committed(
&self,
ob_ctx: &mut ObserverContext<'_>,
id: u64,
k: &raftstore::store::SnapKey,
s: Option<&raftstore::store::Snapshot>,
) {
if let Some(ref forwarder) = *self.forwarder.read().expect("poisoned") {
forwarder.on_apply_snapshot_committed(ob_ctx.region(), id, k, s)
}
}
}

impl<T: Transport + 'static, ER: RaftEngine> RoleObserver for TiFlashObserver<T, ER> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,8 @@ pub(crate) unsafe fn write_snapshot_to_db_data_by_engine(
info!("mock flush snapshot to engine";
"region" => ?region.region,
"store_id" => store_id,
"reason" => reason
"reason" => reason,
"apply_state" => ?region.apply_state
);
let mut batch = kv.rocks.log_batch(1000);
let local_state = kvproto::raft_serverpb::RaftLocalState::default();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,21 @@ pub(crate) unsafe extern "C" fn ffi_query_fap_snapshot_state(
match (*store.engine_store_server).tmp_fap_regions.get(&region_id) {
Some(e) => {
if index == 0 && term == 0 {
debug!("ffi_query_fap_snapshot_state: found unchecked snapshot";
info!("ffi_query_fap_snapshot_state: found unchecked snapshot";
"region_id" => region_id,
"index" => index,
"term" => term,
);
interfaces_ffi::FapSnapshotState::Persisted
} else if e.apply_state.get_applied_index() == index && e.applied_term == term {
debug!("ffi_query_fap_snapshot_state: found matched snapshot";
info!("ffi_query_fap_snapshot_state: found matched snapshot";
"region_id" => region_id,
"index" => index,
"term" => term,
);
interfaces_ffi::FapSnapshotState::Persisted
} else {
debug!("ffi_query_fap_snapshot_state: mismatch snapshot";
info!("ffi_query_fap_snapshot_state: mismatch snapshot";
"region_id" => region_id,
"index" => index,
"term" => term,
Expand Down Expand Up @@ -63,10 +63,12 @@ pub(crate) unsafe extern "C" fn ffi_kvstore_region_exists(
pub(crate) unsafe extern "C" fn ffi_clear_fap_snapshot(
arg1: *mut interfaces_ffi::EngineStoreServerWrap,
region_id: u64,
state: u64,
) {
let store = into_engine_store_server_wrap(arg1);
debug!("ffi_clear_fap_snapshot clean";
"region_id" => region_id
"region_id" => region_id,
"state" => state
);
(*store.engine_store_server)
.tmp_fap_regions
Expand All @@ -78,15 +80,25 @@ pub(crate) unsafe extern "C" fn ffi_apply_fap_snapshot(
region_id: u64,
peer_id: u64,
assert_exist: u8,
_index: u64,
index: u64,
_term: u64,
) -> u8 {
let store = into_engine_store_server_wrap(arg1);
let new_region = match (*store.engine_store_server)
.tmp_fap_regions
.remove(&region_id)
{
Some(e) => e,
if let Some(target_region) = (*store.engine_store_server).kvstore.get_mut(&region_id) {
if target_region.apply_state.get_applied_index() != index {
panic!(
"Don't support FAP for an existing region region_id={} peed_id={} snap_index={} index={}",
region_id,
peer_id,
index,
target_region.apply_state.get_applied_index()
);
}
}
}
let new_region = match (*store.engine_store_server).tmp_fap_regions.get(&region_id) {
Some(e) => e.clone(),
None => {
info!("not a fap snapshot";
"region_id" => region_id,
Expand All @@ -99,6 +111,11 @@ pub(crate) unsafe extern "C" fn ffi_apply_fap_snapshot(
return 0;
}
};
info!("fap snapshot: insert into kvstore";
"region_id" => region_id,
"peer_id" => peer_id,
"assert_exist" => assert_exist,
);
(*store.engine_store_server)
.kvstore
.insert(region_id, new_region);
Expand Down Expand Up @@ -166,6 +183,13 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
});
0
})() != 0;
let fail_after_write_mismatch: bool = (|| {
fail::fail_point!("fap_mock_fail_after_write_mismatch", |t| {
let t = t.unwrap().parse::<u64>().unwrap();
t
});
0
})() != 0;
debug!("recover from remote peer: enter from {} to {}", from_store, store_id; "region_id" => region_id);

if force_wait_for_data {
Expand Down Expand Up @@ -255,7 +279,7 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
debug!("recover from remote peer: meta from {} to {}", from_store, store_id; "region_id" => region_id);
// Must first dump meta then data, otherwise data may lag behind.
// We can see a raft log hole at applied_index otherwise.
let apply_state: RaftApplyState = match general_get_apply_state(
let mut apply_state: RaftApplyState = match general_get_apply_state(
&source_engines.kv,
region_id,
) {
Expand All @@ -266,6 +290,9 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
return;
}
};
if fail_after_write_mismatch {
apply_state.set_applied_index(apply_state.get_applied_index() - 1);
}
new_region.set_applied(apply_state.get_applied_index(), source_region.applied_term);
debug!("recover from remote peer: begin data from {} to {}", from_store, store_id;
"region_id" => region_id,
Expand All @@ -283,7 +310,7 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
return;
}

if fail_after_write {
if fail_after_write || fail_after_write_mismatch {
let mut raft_wb = target_engines.raft.log_batch(1024);
let mut entries: Vec<raft::eraftpb::Entry> = Default::default();
target_engines
Expand All @@ -304,6 +331,7 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
target_engines.raft.gc(region_id, from, to, &mut raft_wb).unwrap();
target_engines.raft.consume(&mut raft_wb, true).unwrap();
}

let apply_state_bytes = apply_state.write_to_bytes().unwrap();
let region_bytes = region_local_state.get_region().write_to_bytes().unwrap();
let apply_state_ptr = create_cpp_str(Some(apply_state_bytes));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ pub unsafe extern "C" fn ffi_apply_pre_handled_snapshot(
.tmp_fap_regions
.remove(&region_id);

info!("regular snapshot: insert into kvstore";
"region_id" => region_id,
"node_id" => node_id,
);
let _ = &(*store.engine_store_server)
.kvstore
.insert(region_id, Box::new(region_meta.region.take().unwrap()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ impl EngineStoreServerWrap {
let new_region =
make_new_region(Some(region_meta.clone()), Some(node_id));

info!("batch split: insert into kvstore";
"region_id" => region_id,
"node_id" => node_id,
);
// No need to split data because all KV are stored in the same
// RocksDB. TODO But we still need
// to clean all in-memory data.
Expand All @@ -143,6 +147,12 @@ impl EngineStoreServerWrap {
let tikv_region = resp.get_split().get_left();

let _target = req.prepare_merge.as_ref().unwrap().target.as_ref();

info!("merge: insert into kvstore";
"region_id" => region_id,
"node_id" => node_id,
);

let region_meta = &mut (engine_store_server
.kvstore
.get_mut(&region_id)
Expand Down Expand Up @@ -279,7 +289,10 @@ impl EngineStoreServerWrap {
// Currently in tests, we don't handle commands like BatchSplit,
// and sometimes we don't bootstrap region 1,
// so it is normal if we find no region.
warn!("region {} not found, create for {}", region_id, node_id);
warn!(
"region {} not found, create for {}, insert into kvstore",
region_id, node_id
);
let new_region = v.insert(Default::default());
assert!((*self.engine_store_server).kvstore.contains_key(&region_id));
do_handle_admin_raft_cmd(new_region, &mut (*self.engine_store_server))
Expand Down
4 changes: 2 additions & 2 deletions proxy_components/proxy_ffi/src/engine_store_helper_impls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,9 @@ impl EngineStoreServerHelper {
unsafe { (self.fn_kvstore_region_exists.into_inner())(self.inner, region_id) }
}

pub fn clear_fap_snapshot(&self, region_id: u64) {
pub fn clear_fap_snapshot(&self, region_id: u64, state: u64) {
debug_assert!(self.fn_clear_fap_snapshot.is_some());
unsafe { (self.fn_clear_fap_snapshot.into_inner())(self.inner, region_id) }
unsafe { (self.fn_clear_fap_snapshot.into_inner())(self.inner, region_id, state) }
}

pub fn handle_ingest_sst(
Expand Down
Loading