Skip to content

Commit 39acebf

Browse files
RkGritszetszwo
authored andcommitted
RATIS-2278. Follower Fails to Append Entries Due to Index Validation Race Condition in NavigableIndices (apache#1248)
1 parent 2c16ccb commit 39acebf

2 files changed

Lines changed: 16 additions & 8 deletions

File tree

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1634,7 +1634,10 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16341634
}
16351635
private CompletableFuture<Void> appendLog(List<LogEntryProto> entries) {
16361636
final List<ConsecutiveIndices> entriesTermIndices = ConsecutiveIndices.convert(entries);
1637-
appendLogTermIndices.append(entriesTermIndices);
1637+
if (!appendLogTermIndices.append(entriesTermIndices)) {
1638+
// index already exists, return the last future
1639+
return appendLogFuture.get();
1640+
}
16381641

16391642
return appendLogFuture.updateAndGet(f -> f.thenCompose(
16401643
ignored -> JavaUtils.allOf(state.getLog().append(entries))))

ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,20 @@ synchronized Long getTerm(long index) {
136136
return floorEntry.getValue().getTerm(index);
137137
}
138138

139-
synchronized void append(List<ConsecutiveIndices> entriesTermIndices) {
140-
for(ConsecutiveIndices indices : entriesTermIndices) {
141-
// validate startIndex
142-
final Map.Entry<Long, ConsecutiveIndices> lastEntry = map.lastEntry();
143-
if (lastEntry != null) {
144-
Preconditions.assertSame(lastEntry.getValue().getNextIndex(), indices.startIndex, "startIndex");
139+
synchronized boolean append(List<ConsecutiveIndices> entriesTermIndices) {
140+
for(int i = 0; i < entriesTermIndices.size(); i++) {
141+
final ConsecutiveIndices indices = entriesTermIndices.get(i);
142+
final ConsecutiveIndices previous = map.put(indices.startIndex, indices);
143+
if (previous != null) {
144+
// index already exists, revert this append
145+
map.put(previous.startIndex, previous);
146+
for(int j = 0; j < i; j++) {
147+
map.remove(entriesTermIndices.get(j).startIndex);
148+
}
149+
return false;
145150
}
146-
map.put(indices.startIndex, indices);
147151
}
152+
return true;
148153
}
149154

150155
synchronized void removeExisting(List<ConsecutiveIndices> entriesTermIndices) {

0 commit comments

Comments
 (0)