Skip to content

Commit 5ff6be1

Browse files
committed
Leader takes into account previous log when checking whether to trigger snapshot
1 parent 0ff6c20 commit 5ff6be1

4 files changed

Lines changed: 40 additions & 21 deletions

File tree

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,10 @@ private TermIndex shouldNotifyToInstallSnapshot() {
868868
return firstAvailable;
869869
}
870870

871+
if (followerNextIndex == leaderStartIndex && getPrevious(followerNextIndex) == null) {
872+
return firstAvailable;
873+
}
874+
871875
return null;
872876
}
873877

ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,35 @@ default RaftPeerId getFollowerId() {
134134
/** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */
135135
Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot);
136136

137+
/**
138+
* Get the previous {@link TermIndex} for the given next index.
139+
* This is used to set the previous log entry in AppendEntries requests.
140+
*
141+
* @return the previous {@link TermIndex}, or null if unavailable
142+
* (e.g. the entry has been purged and the snapshot does not cover it).
143+
*/
144+
default TermIndex getPrevious(long nextIndex) {
145+
if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
146+
return null;
147+
}
148+
149+
final long previousIndex = nextIndex - 1;
150+
final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
151+
if (previous != null) {
152+
return previous;
153+
}
154+
155+
final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
156+
if (snapshot != null) {
157+
final TermIndex snapshotTermIndex = snapshot.getTermIndex();
158+
if (snapshotTermIndex.getIndex() == previousIndex) {
159+
return snapshotTermIndex;
160+
}
161+
}
162+
163+
return null;
164+
}
165+
137166
/**
138167
* Should this {@link LogAppender} send a snapshot to the follower?
139168
*
@@ -164,6 +193,9 @@ default SnapshotInfo shouldInstallSnapshot() {
164193
if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
165194
return snapshot;
166195
}
196+
if (followerNextIndex == logStartIndex && getPrevious(followerNextIndex) == null) {
197+
return snapshot;
198+
}
167199
}
168200
return null;
169201
}

ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -165,27 +165,6 @@ public boolean hasPendingDataRequests() {
165165
return false;
166166
}
167167

168-
private TermIndex getPrevious(long nextIndex) {
169-
if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
170-
return null;
171-
}
172-
173-
final long previousIndex = nextIndex - 1;
174-
final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
175-
if (previous != null) {
176-
return previous;
177-
}
178-
179-
final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
180-
if (snapshot != null) {
181-
final TermIndex snapshotTermIndex = snapshot.getTermIndex();
182-
if (snapshotTermIndex.getIndex() == previousIndex) {
183-
return snapshotTermIndex;
184-
}
185-
}
186-
187-
return null;
188-
}
189168

190169
protected long getNextIndexForInconsistency(long requestFirstIndex, long replyNextIndex) {
191170
long next = replyNextIndex;

ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,5 +309,9 @@ void runTestNewAppendEntriesRequestAfterPurge(CLUSTER cluster,
309309

310310
Assertions.assertEquals(targetNextIndex, appender.getFollower().getNextIndex(),
311311
"Follower nextIndex should remain unchanged");
312+
313+
Assertions.assertNotNull(appender.shouldInstallSnapshot(),
314+
"shouldInstallSnapshot should return non-null when followerNextIndex ("
315+
+ targetNextIndex + ") and previous entry has been purged");
312316
}
313317
}

0 commit comments

Comments
 (0)