Skip to content

Commit d7370f8

Browse files
authored
RATIS-2378. fix listener role transition (#1331)
1 parent 6e1e15f commit d7370f8

4 files changed

Lines changed: 32 additions & 8 deletions

File tree

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -591,7 +591,7 @@ private synchronized CompletableFuture<Void> changeToFollower(
591591
throw new IllegalStateException("Unexpected role " + old);
592592
}
593593
CompletableFuture<Void> future = CompletableFuture.completedFuture(null);
594-
if ((old != RaftPeerRole.FOLLOWER || force) && old != RaftPeerRole.LISTENER) {
594+
if (shouldSetFollower(old, force)) {
595595
setRole(RaftPeerRole.FOLLOWER, reason);
596596
if (old == RaftPeerRole.LEADER) {
597597
future = role.shutdownLeaderState(false)
@@ -607,7 +607,7 @@ private synchronized CompletableFuture<Void> changeToFollower(
607607
state.setLeader(null, reason);
608608
} else if (old == RaftPeerRole.CANDIDATE) {
609609
future = role.shutdownLeaderElection();
610-
} else if (old == RaftPeerRole.FOLLOWER) {
610+
} else if (old == RaftPeerRole.FOLLOWER || old == RaftPeerRole.LISTENER) {
611611
future = role.shutdownFollowerState();
612612
}
613613

@@ -620,6 +620,14 @@ private synchronized CompletableFuture<Void> changeToFollower(
620620
return future;
621621
}
622622

623+
private boolean shouldSetFollower(RaftPeerRole old, boolean force) {
624+
if (old == RaftPeerRole.LISTENER) {
625+
final RaftConfigurationImpl conf = state.getRaftConf();
626+
return conf.isStable() && conf.containsInConf(getId());
627+
}
628+
return old != RaftPeerRole.FOLLOWER || force;
629+
}
630+
623631
synchronized CompletableFuture<Void> changeToFollowerAndPersistMetadata(
624632
long newTerm,
625633
boolean allowListener,

ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,12 @@ boolean isConfCommitted() {
376376
return getLog().getLastCommittedIndex() >= getRaftConf().getLogEntryIndex();
377377
}
378378

379-
void setRaftConf(LogEntryProto entry) {
379+
private boolean setRaftConf(LogEntryProto entry) {
380380
if (entry.hasConfigurationEntry()) {
381381
setRaftConf(LogProtoUtils.toRaftConfiguration(entry));
382+
return true;
382383
}
384+
return false;
383385
}
384386

385387
void setRaftConf(RaftConfiguration conf) {
@@ -397,10 +399,19 @@ void truncate(long logIndex) {
397399
configurationManager.removeConfigurations(logIndex);
398400
}
399401

400-
void updateConfiguration(List<LogEntryProto> entries) {
401-
if (entries != null && !entries.isEmpty()) {
402-
configurationManager.removeConfigurations(entries.get(0).getIndex());
403-
entries.forEach(this::setRaftConf);
402+
void updateConfiguration(List<LogEntryProto> entries) throws IOException {
403+
if (entries == null || entries.isEmpty()) {
404+
return;
405+
}
406+
configurationManager.removeConfigurations(entries.get(0).getIndex());
407+
408+
boolean changed = false;
409+
for(LogEntryProto entry : entries) {
410+
changed |= setRaftConf(entry);
411+
}
412+
413+
if (changed && server.getRole().getCurrentRole() == RaftPeerRole.LISTENER) {
414+
server.changeToFollowerAndPersistMetadata(getCurrentTerm(), true, "setRaftConf").join();
404415
}
405416
}
406417

ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.slf4j.LoggerFactory;
4747

4848
import java.io.IOException;
49+
import java.util.Collections;
4950
import java.util.Optional;
5051
import java.util.concurrent.CompletableFuture;
5152
import java.util.concurrent.atomic.AtomicBoolean;
@@ -145,7 +146,7 @@ private InstallSnapshotReplyProto installSnapshotImpl(InstallSnapshotRequestProt
145146
state.truncate(proto.getIndex());
146147
if (!state.getRaftConf().equals(LogProtoUtils.toRaftConfiguration(proto))) {
147148
LOG.info("{}: set new configuration {} from snapshot", getMemberId(), ProtoUtils.shortDebugString(proto));
148-
state.setRaftConf(proto);
149+
state.updateConfiguration(Collections.singletonList(proto));
149150
state.writeRaftConfiguration(proto);
150151
server.getStateMachine().event().notifyConfigurationChanged(
151152
proto.getTerm(), proto.getIndex(), proto.getConfigurationEntry());

ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,6 +556,10 @@ public void testChangeListenerToFollower() throws Exception {
556556
assertTrue(reply.isSuccess());
557557
Collection<RaftPeer> peer = leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
558558
assertEquals(0, peer.size());
559+
560+
listeners = cluster.getListeners()
561+
.stream().map(RaftServer.Division::getPeer).collect(Collectors.toList());
562+
assertEquals(0, listeners.size());
559563
}
560564
cluster.shutdown();
561565
}

0 commit comments

Comments
 (0)