Skip to content

Commit 77fb832

Browse files
author
yawzhang
committed
Avoid dup-append logs for committing logs.
This change avoid the following corner case: T1: lsn=100 is executing handle_commit T2: lsn=100 is requested to append from leader again, triggering raft_event and retrieving the existed rreq from map T3: lsn=100 committed, the rreq is removed from map and is cleared T4: during the raft_event process, the rreq has already been cleared, preventing further processing
1 parent 5822372 commit 77fb832

3 files changed

Lines changed: 25 additions & 15 deletions

File tree

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomestoreConan(ConanFile):
1111
name = "homestore"
12-
version = "6.17.2"
12+
version = "6.17.3"
1313

1414
homepage = "https://github.com/eBay/Homestore"
1515
description = "HomeStore Storage Engine"

src/lib/replication/repl_dev/raft_repl_dev.cpp

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,13 +1414,6 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
14141414
m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), {rreq->local_blkid()}, rreq);
14151415
}
14161416

1417-
if (!recovery) {
1418-
auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn());
1419-
RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn,
1420-
"Out of order commit of lsns, it is not expected in RaftReplDev. cur_lsns={}, prev_lsns={}",
1421-
rreq->lsn(), prev_lsn);
1422-
}
1423-
14241417
// Remove the request from repl_key map only after the listener operation is completed.
14251418
// This prevents unnecessary block allocation in the following scenario:
14261419
// 1. The follower processes a commit for LSN 100 and remove rreq from rep_key map before listener commit
@@ -1431,6 +1424,13 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
14311424
m_repl_key_req_map.erase(rreq->rkey());
14321425
// Remove the request from lsn map.
14331426
m_state_machine->unlink_lsn_to_req(rreq->lsn(), rreq);
1427+
1428+
if (!recovery) {
1429+
auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn());
1430+
RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn,
1431+
"Out of order commit of lsns, it is not expected in RaftReplDev. cur_lsns={}, prev_lsns={}",
1432+
rreq->lsn(), prev_lsn);
1433+
}
14341434
if (!rreq->is_proposer()) rreq->clear();
14351435
}
14361436

@@ -1856,18 +1856,21 @@ nuraft::cb_func::ReturnCode RaftReplDev::raft_event(nuraft::cb_func::Type type,
18561856
m_commit_upto_lsn.load(), raft_req->get_commit_idx());
18571857

18581858
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
1859-
auto last_commit_lsn = uint64_cast(get_last_commit_lsn());
1859+
auto last_lsn = uint64_cast(m_data_journal->next_slot() - 1);
1860+
auto last_term = last_lsn == 0 ? 0
1861+
: m_data_journal->term_at(last_lsn);
18601862
for (unsigned long i = 0; i < entries.size(); i++) {
18611863
auto& entry = entries[i];
18621864
auto lsn = start_lsn + i;
18631865
auto term = entry->get_term();
18641866
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
18651867
if (entry->get_buf_ptr()->size() == 0) { continue; }
1866-
// skipping localize for already committed log(dup), they anyway will be discard
1867-
// by nuraft before append_log.
1868-
if (lsn <= last_commit_lsn) {
1869-
RD_LOGT(NO_TRACE_ID, "Raft channel: term {}, lsn {}, skipping dup, last_commit_lsn {}", term, lsn,
1870-
last_commit_lsn);
1868+
// skipping localize for already appended log with the same term(dup), they anyway will be discard
1869+
// or rollback by nuraft before append_log, this way can avoid dup-append for committing logs
1870+
if (lsn <= last_lsn && term == last_term) {
1871+
// This is a duplicate log, we have already applied it, so skip it.
1872+
RD_LOGT(NO_TRACE_ID, "Raft channel: term {}, lsn {}, skipping dup, last_lsn {}, last_term {}", term, lsn,
1873+
last_lsn, last_term);
18711874
continue;
18721875
}
18731876
// Those LSNs already in logstore but not yet committed, will be dedup here,

src/tests/test_common/raft_repl_test_base.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -614,7 +614,14 @@ class RaftReplDevTestBase : public testing::Test {
614614
data_size == nullptr ? std::abs(std::lround(num_blks_gen(g_re))) * block_size : *data_size;
615615
this->generate_writes(size, block_size, db);
616616
});
617-
if (wait_for_commit) { g_helper->runner().execute().get(); }
617+
if (wait_for_commit) {
618+
g_helper->runner().execute().get();
619+
// wait for related rreqs being removed from map. this way to avoid rreqs reused in this case:
620+
// 1. follower committing rreq
621+
// 2. follower received a duplicated append log entries from leader, then get the rreq from map
622+
// 3. follower finished commit, clear rreq, then the append thread hold an empty rreq.
623+
std::this_thread::sleep_for(std::chrono::seconds{1});
624+
}
618625
break;
619626
} else {
620627
LOGINFO("{} entries were written on the leader_uuid={} my_uuid={}", num_entries,

0 commit comments

Comments
 (0)