diff --git a/src/braft/node.cpp b/src/braft/node.cpp index cd702f42..f372b683 100644 --- a/src/braft/node.cpp +++ b/src/braft/node.cpp @@ -2226,11 +2226,32 @@ int NodeImpl::handle_request_vote_request(const RequestVoteRequest* request, LogId last_log_id = _log_manager->last_log_id(true); lck.lock(); - // vote need ABA check after unlock&lock + // Term may have changed while mutex was released for last_log_id I/O. + // Since term is monotonically increasing, handle three cases: if (previous_term != _current_term) { - LOG(WARNING) << "node " << _group_id << ":" << _server_id - << " raise term " << _current_term << " when get last_log_id"; - break; + if (_current_term < previous_term) { + LOG(ERROR) << "node " << _group_id << ":" << _server_id + << " term regressed from " << previous_term + << " to " << _current_term + << " which should never happen"; + break; + } + if (_current_term > request->term()) { + LOG(WARNING) << "node " << _group_id << ":" << _server_id + << " term changed from " << previous_term + << " to " << _current_term + << " (> request term " << request->term() + << ") while getting last_log_id, reject stale vote"; + break; + } + // _current_term advanced but still <= request->term(); + // the vote request remains valid -- continue processing. + LOG(INFO) << "node " << _group_id << ":" << _server_id + << " term changed from " << previous_term + << " to " << _current_term + << " (<= request term " << request->term() + << "), vote request still valid"; + previous_term = _current_term; } bool log_is_ok = (LogId(request->last_log_index(), request->last_log_term()) diff --git a/test/test_node.cpp b/test/test_node.cpp index 407e16af..2e813761 100644 --- a/test/test_node.cpp +++ b/test/test_node.cpp @@ -3566,6 +3566,134 @@ TEST_P(NodeTest, readonly) { cluster.stop_all(); } +TEST_P(NodeTest, VoteNotRejectedAfterTermAdvanceDuringUnlock) { + // Regression test for the race condition in handle_request_vote_request: + // When _current_term advances during the unlock window (for last_log_id I/O) + // but remains <= request->term(), the vote should NOT be rejected. + // + // Scenario: follower at term T receives RequestVote(term=T+1). While mutex + // is released, a heartbeat arrives and bumps _current_term to T+1. The old + // code unconditionally rejected this; the fix allows it to proceed. + + std::vector peers; + for (int i = 0; i < 3; i++) { + braft::PeerId peer; + peer.addr.ip = butil::my_ip(); + peer.addr.port = 5006 + i; + peer.idx = 0; + peers.push_back(peer); + } + + // Large election timeout prevents background elections from racing with + // our manual state manipulation below. + Cluster cluster("unittest", peers, 30000); + for (size_t i = 0; i < peers.size(); i++) { + ASSERT_EQ(0, cluster.start(peers[i].addr)); + } + + cluster.wait_leader(); + braft::Node* leader = cluster.leader(); + ASSERT_TRUE(leader != NULL); + LOG(WARNING) << "leader is " << leader->node_id(); + + bthread::CountdownEvent cond(10); + for (int i = 0; i < 10; i++) { + butil::IOBuf data; + char data_buf[128]; + snprintf(data_buf, sizeof(data_buf), "hello: %d", i + 1); + data.append(data_buf); + braft::Task task; + task.data = &data; + task.done = NEW_APPLYCLOSURE(&cond, 0); + leader->apply(task); + } + cond.wait(); + cluster.ensure_same(); + + std::vector followers; + cluster.followers(&followers); + ASSERT_FALSE(followers.empty()); + + braft::Node* follower = followers[0]; + braft::NodeImpl* impl = follower->_impl; + + // Pick a peer that is neither the follower nor the leader as "candidate" + braft::PeerId candidate_peer; + for (size_t i = 0; i < peers.size(); i++) { + if (peers[i] != follower->node_id().peer_id && + peers[i] != leader->node_id().peer_id) { + candidate_peer = peers[i]; + break; + } + } + + // Capture last_log_id outside the lock (it may do I/O internally) + braft::LogId last_log_id = impl->_log_manager->last_log_id(true); + + // --- Positive case: vote granted when request->term() == _current_term --- + // Simulate a heartbeat having already bumped the follower's term to T+1, + // then a RequestVote(T+1) arrives -- the vote should be granted. + int64_t request_term; + impl->_mutex.lock(); + request_term = impl->_current_term + 1; + butil::Status st; + st.set_error(braft::EHIGHERTERMREQUEST, "Simulated heartbeat term bump"); + impl->step_down(request_term, false, st); + impl->_voted_id.reset(); + ASSERT_EQ(request_term, impl->_current_term); + impl->_mutex.unlock(); + + { + braft::RequestVoteRequest request; + request.set_group_id("unittest"); + request.set_server_id(candidate_peer.to_string()); + request.set_peer_id(follower->node_id().peer_id.to_string()); + request.set_term(request_term); + request.set_last_log_term(last_log_id.term); + request.set_last_log_index(last_log_id.index); + + braft::RequestVoteResponse response; + int rc = impl->handle_request_vote_request(&request, &response); + ASSERT_EQ(0, rc); + + LOG(WARNING) << "VoteRace positive: granted=" << response.granted() + << " response_term=" << response.term() + << " request_term=" << request_term; + ASSERT_TRUE(response.granted()); + ASSERT_EQ(request_term, response.term()); + } + + // --- Negative case: vote rejected when current term exceeds request term --- + const int64_t higher_term = request_term + 2; + impl->_mutex.lock(); + st.set_error(braft::EHIGHERTERMREQUEST, "Simulated further term bump"); + impl->step_down(higher_term, false, st); + impl->_voted_id.reset(); + ASSERT_EQ(higher_term, impl->_current_term); + impl->_mutex.unlock(); + + { + braft::RequestVoteRequest request; + request.set_group_id("unittest"); + request.set_server_id(candidate_peer.to_string()); + request.set_peer_id(follower->node_id().peer_id.to_string()); + request.set_term(request_term); // stale term, lower than current + request.set_last_log_term(last_log_id.term); + request.set_last_log_index(last_log_id.index); + + braft::RequestVoteResponse response; + int rc = impl->handle_request_vote_request(&request, &response); + ASSERT_EQ(0, rc); + + LOG(WARNING) << "VoteRace negative: granted=" << response.granted() + << " response_term=" << response.term(); + ASSERT_FALSE(response.granted()); + } + + LOG(WARNING) << "cluster stop"; + cluster.stop_all(); +} + INSTANTIATE_TEST_CASE_P(NodeTestWithoutPipelineReplication, NodeTest, ::testing::Values("NoReplcation"));