Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions src/braft/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2226,11 +2226,32 @@ int NodeImpl::handle_request_vote_request(const RequestVoteRequest* request,
LogId last_log_id = _log_manager->last_log_id(true);
lck.lock();

// vote need ABA check after unlock&lock
// Term may have changed while mutex was released for last_log_id I/O.
// Since term is monotonically increasing, handle three cases:
if (previous_term != _current_term) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " raise term " << _current_term << " when get last_log_id";
break;
if (_current_term < previous_term) {
LOG(ERROR) << "node " << _group_id << ":" << _server_id
<< " term regressed from " << previous_term
<< " to " << _current_term
<< " which should never happen";
break;
}
if (_current_term > request->term()) {
LOG(WARNING) << "node " << _group_id << ":" << _server_id
<< " term changed from " << previous_term
<< " to " << _current_term
<< " (> request term " << request->term()
<< ") while getting last_log_id, reject stale vote";
break;
}
// _current_term advanced but still <= request->term();
// the vote request remains valid -- continue processing.
LOG(INFO) << "node " << _group_id << ":" << _server_id
<< " term changed from " << previous_term
<< " to " << _current_term
<< " (<= request term " << request->term()
<< "), vote request still valid";
previous_term = _current_term;
}

bool log_is_ok = (LogId(request->last_log_index(), request->last_log_term())
Expand Down
128 changes: 128 additions & 0 deletions test/test_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3566,6 +3566,134 @@ TEST_P(NodeTest, readonly) {
cluster.stop_all();
}

TEST_P(NodeTest, VoteNotRejectedAfterTermAdvanceDuringUnlock) {
// Regression test for the race condition in handle_request_vote_request:
// When _current_term advances during the unlock window (for last_log_id I/O)
// but remains <= request->term(), the vote should NOT be rejected.
//
// Scenario: follower at term T receives RequestVote(term=T+1). While mutex
// is released, a heartbeat arrives and bumps _current_term to T+1. The old
// code unconditionally rejected this; the fix allows it to proceed.

std::vector<braft::PeerId> peers;
for (int i = 0; i < 3; i++) {
braft::PeerId peer;
peer.addr.ip = butil::my_ip();
peer.addr.port = 5006 + i;
peer.idx = 0;
peers.push_back(peer);
}

// Large election timeout prevents background elections from racing with
// our manual state manipulation below.
Cluster cluster("unittest", peers, 30000);
for (size_t i = 0; i < peers.size(); i++) {
ASSERT_EQ(0, cluster.start(peers[i].addr));
}

cluster.wait_leader();
braft::Node* leader = cluster.leader();
ASSERT_TRUE(leader != NULL);
LOG(WARNING) << "leader is " << leader->node_id();

bthread::CountdownEvent cond(10);
for (int i = 0; i < 10; i++) {
butil::IOBuf data;
char data_buf[128];
snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1);
data.append(data_buf);
braft::Task task;
task.data = &data;
task.done = NEW_APPLYCLOSURE(&cond, 0);
leader->apply(task);
}
cond.wait();
cluster.ensure_same();

std::vector<braft::Node*> followers;
cluster.followers(&followers);
ASSERT_FALSE(followers.empty());

braft::Node* follower = followers[0];
braft::NodeImpl* impl = follower->_impl;

// Pick a peer that is neither the follower nor the leader as "candidate"
braft::PeerId candidate_peer;
for (size_t i = 0; i < peers.size(); i++) {
if (peers[i] != follower->node_id().peer_id &&
peers[i] != leader->node_id().peer_id) {
candidate_peer = peers[i];
break;
}
}

// Capture last_log_id outside the lock (it may do I/O internally)
braft::LogId last_log_id = impl->_log_manager->last_log_id(true);

// --- Positive case: vote granted when request->term() == _current_term ---
// Simulate a heartbeat having already bumped the follower's term to T+1,
// then a RequestVote(T+1) arrives -- the vote should be granted.
int64_t request_term;
impl->_mutex.lock();
request_term = impl->_current_term + 1;
butil::Status st;
st.set_error(braft::EHIGHERTERMREQUEST, "Simulated heartbeat term bump");
impl->step_down(request_term, false, st);
impl->_voted_id.reset();
ASSERT_EQ(request_term, impl->_current_term);
impl->_mutex.unlock();

{
braft::RequestVoteRequest request;
request.set_group_id("unittest");
request.set_server_id(candidate_peer.to_string());
request.set_peer_id(follower->node_id().peer_id.to_string());
request.set_term(request_term);
request.set_last_log_term(last_log_id.term);
request.set_last_log_index(last_log_id.index);

braft::RequestVoteResponse response;
int rc = impl->handle_request_vote_request(&request, &response);
ASSERT_EQ(0, rc);

LOG(WARNING) << "VoteRace positive: granted=" << response.granted()
<< " response_term=" << response.term()
<< " request_term=" << request_term;
ASSERT_TRUE(response.granted());
ASSERT_EQ(request_term, response.term());
}

// --- Negative case: vote rejected when current term exceeds request term ---
const int64_t higher_term = request_term + 2;
impl->_mutex.lock();
st.set_error(braft::EHIGHERTERMREQUEST, "Simulated further term bump");
impl->step_down(higher_term, false, st);
impl->_voted_id.reset();
ASSERT_EQ(higher_term, impl->_current_term);
impl->_mutex.unlock();

{
braft::RequestVoteRequest request;
request.set_group_id("unittest");
request.set_server_id(candidate_peer.to_string());
request.set_peer_id(follower->node_id().peer_id.to_string());
request.set_term(request_term); // stale term, lower than current
request.set_last_log_term(last_log_id.term);
request.set_last_log_index(last_log_id.index);

braft::RequestVoteResponse response;
int rc = impl->handle_request_vote_request(&request, &response);
ASSERT_EQ(0, rc);

LOG(WARNING) << "VoteRace negative: granted=" << response.granted()
<< " response_term=" << response.term();
ASSERT_FALSE(response.granted());
}

LOG(WARNING) << "cluster stop";
cluster.stop_all();
}

INSTANTIATE_TEST_CASE_P(NodeTestWithoutPipelineReplication,
NodeTest,
::testing::Values("NoReplcation"));
Expand Down
Loading