Skip to content

Commit 9bd82aa

Browse files
authored
RATIS-1960. Follower may be incorrectly marked as having caught up (#983)
1 parent 01b386b commit 9bd82aa

File tree

1 file changed

+34
-19
lines changed

1 file changed

+34
-19
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -500,16 +500,25 @@ PendingRequest startSetConfiguration(SetConfigurationRequest request, List<RaftP
500500
peersToBootStrap, listenersToBootStrap, new PeerConfiguration(peersInNewConf, listenersInNewConf));
501501
Collection<RaftPeer> newPeers = configurationStagingState.getNewPeers();
502502
Collection<RaftPeer> newListeners = configurationStagingState.getNewListeners();
503-
// set the staging state
504-
this.stagingState = configurationStagingState;
505-
506-
if (newPeers.isEmpty() && newListeners.isEmpty()) {
507-
applyOldNewConf();
503+
Collection<RaftPeer> allNew = newListeners.isEmpty()
504+
? newPeers
505+
: newPeers.isEmpty()
506+
? newListeners
507+
: Stream.concat(newPeers.stream(), newListeners.stream())
508+
.collect(Collectors.toList());
509+
510+
if (allNew.isEmpty()) {
511+
applyOldNewConf(configurationStagingState);
508512
} else {
509513
// update the LeaderState's sender list
510-
addAndStartSenders(newPeers);
511-
addAndStartSenders(newListeners);
514+
Collection<LogAppender> newAppenders = addSenders(allNew);
515+
516+
// set the staging state
517+
stagingState = configurationStagingState;
518+
519+
newAppenders.forEach(LogAppender::start);
512520
}
521+
513522
return pending;
514523
}
515524

@@ -579,14 +588,14 @@ private void commitIndexChanged() {
579588
notifySenders();
580589
}
581590

582-
private void applyOldNewConf() {
591+
private void applyOldNewConf(ConfigurationStagingState stage) {
583592
final ServerState state = server.getState();
584593
final RaftConfigurationImpl current = state.getRaftConf();
585-
final RaftConfigurationImpl oldNewConf= stagingState.generateOldNewConf(current, state.getLog().getNextIndex());
594+
final long nextIndex = state.getLog().getNextIndex();
595+
final RaftConfigurationImpl oldNewConf = stage.generateOldNewConf(current, nextIndex);
586596
// apply the (old, new) configuration to log, and use it as the current conf
587597
appendConfiguration(oldNewConf);
588598

589-
this.stagingState = null;
590599
notifySenders();
591600
}
592601

@@ -607,7 +616,7 @@ void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> prot
607616
@Override
608617
public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
609618
List<LogEntryProto> entries, TermIndex previous, long callId) {
610-
final boolean initializing = isCaughtUp(follower);
619+
final boolean initializing = !isCaughtUp(follower);
611620
final RaftPeerId targetId = follower.getId();
612621
return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
613622
ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
@@ -618,9 +627,13 @@ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follo
618627
* Update sender list for setConfiguration request
619628
*/
620629
private void addAndStartSenders(Collection<RaftPeer> newPeers) {
621-
if (!newPeers.isEmpty()) {
622-
addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
623-
}
630+
addSenders(newPeers).forEach(LogAppender::start);
631+
}
632+
633+
private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers) {
634+
return !newPeers.isEmpty()
635+
? addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false)
636+
: Collections.emptyList();
624637
}
625638

626639
private RaftPeer getPeer(RaftPeerId id) {
@@ -811,20 +824,22 @@ private void checkStaging() {
811824
} else {
812825
final long commitIndex = server.getState().getLog().getLastCommittedIndex();
813826
// check progress for the new followers
814-
final EnumSet<BootStrapProgress> reports = getLogAppenders()
827+
final List<FollowerInfoImpl> laggingFollowers = getLogAppenders()
815828
.map(LogAppender::getFollower)
816829
.filter(follower -> !isCaughtUp(follower))
830+
.map(FollowerInfoImpl.class::cast)
831+
.collect(Collectors.toList());
832+
final EnumSet<BootStrapProgress> reports = laggingFollowers.stream()
817833
.map(follower -> checkProgress(follower, commitIndex))
818834
.collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class)));
819835
if (reports.contains(BootStrapProgress.NOPROGRESS)) {
820836
stagingState.fail(BootStrapProgress.NOPROGRESS);
821837
} else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
822838
// all caught up!
823-
applyOldNewConf();
824-
getLogAppenders()
825-
.map(LogAppender::getFollower)
839+
applyOldNewConf(stagingState);
840+
this.stagingState = null;
841+
laggingFollowers.stream()
826842
.filter(f -> server.getRaftConf().containsInConf(f.getId()))
827-
.map(FollowerInfoImpl.class::cast)
828843
.forEach(FollowerInfoImpl::catchUp);
829844
}
830845
}

0 commit comments

Comments
 (0)