Skip to content

Commit cdfc66f

Browse files
committed
RATIS-2506. Delay consecutive GrpcLogAppender restart.
1 parent 01b6e4f commit cdfc66f

7 files changed

Lines changed: 102 additions & 27 deletions

File tree

ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ static Throwable unwrapThrowable(Throwable t) {
9797
return unwrapped;
9898
}
9999
}
100-
return t;
100+
return JavaUtils.unwrapCompletionException(t);
101101
}
102102

103103
static IOException unwrapException(StatusRuntimeException se) {

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
import java.util.concurrent.TimeUnit;
6464
import java.util.concurrent.atomic.AtomicLong;
6565

66+
import static org.apache.ratis.server.raftlog.LogProtoUtils.toLogEntryTermIndexString;
67+
6668
/**
6769
* A new log appender implementation using grpc bi-directional stream API.
6870
*/
@@ -127,31 +129,21 @@ boolean isError() {
127129
}
128130
}
129131

130-
static class ReplyState {
132+
class ReplyState {
131133
private boolean firstReplyReceived = false;
132-
private int errorCount = 0;
133134

134135
synchronized boolean isFirstReplyReceived() {
135136
return firstReplyReceived;
136137
}
137138

138-
synchronized int getErrorCount() {
139-
return errorCount;
140-
}
141-
142139
int process(AppendResult result) {
143140
return process(result == AppendResult.INCONSISTENCY? Event.APPEND_ENTRIES_INCONSISTENCY_REPLY
144141
: Event.APPEND_ENTRIES_REPLY);
145142
}
146143

147144
synchronized int process(Event event) {
148145
firstReplyReceived = event.updateFirstReplyReceived(firstReplyReceived);
149-
if (event.isError()) {
150-
errorCount++;
151-
} else {
152-
errorCount = 0;
153-
}
154-
return errorCount;
146+
return getFollower().getErrorState().updateErrorCount(event.isError());
155147
}
156148
}
157149

@@ -298,17 +290,25 @@ private boolean isSlowFollower() {
298290
private void mayWait() {
299291
// use lastSend time instead of lastResponse time
300292
try {
301-
getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(),
302-
TimeUnit.MILLISECONDS);
293+
// For errors, enforce sleep which cannot be waked up by signal
294+
sleepForErrors();
295+
// Normal await can be waked up by signal
296+
getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
303297
} catch (InterruptedException ie) {
304-
LOG.warn(this + ": Wait interrupted by " + ie);
305298
Thread.currentThread().interrupt();
299+
LOG.warn("{} is interrupted", this, ie);
306300
}
307301
}
308302

309-
private long errorWaitTimeMs() {
310-
return errorRetryWaitPolicy.handleAttemptFailure(replyState::getErrorCount)
311-
.getSleepTime().toLong(TimeUnit.MILLISECONDS);
303+
private void sleepForErrors() throws InterruptedException {
304+
final int errorCount = getFollower().getErrorState().getErrorCountToDelay();
305+
if (errorCount < 1) {
306+
return;
307+
}
308+
309+
final TimeDuration sleepTime = errorRetryWaitPolicy.handleAttemptFailure(() -> errorCount).getSleepTime();
310+
LOG.debug("{}: sleepForError {}, errorCount={}", this, sleepTime, errorCount);
311+
sleepTime.sleep();
312312
}
313313

314314
@Override
@@ -889,13 +889,9 @@ boolean isHeartbeat() {
889889

890890
@Override
891891
public String toString() {
892-
final String entries = entriesCount == 0? ""
893-
: entriesCount == 1? ",entry=" + firstEntry
894-
: ",entries=" + firstEntry + "..." + lastEntry;
895892
return JavaUtils.getClassSimpleName(getClass())
896893
+ ":cid=" + callId
897-
+ ",entriesCount=" + entriesCount
898-
+ entries;
894+
+ "," + toLogEntryTermIndexString(entriesCount, firstEntry, lastEntry);
899895
}
900896
}
901897

ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,33 @@ public interface FollowerInfo {
112112

113113
/** Update lastRpcResponseTime and LastRespondedAppendEntriesSendTime */
114114
void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime);
115+
116+
/** @return the error state. */
117+
ErrorState getErrorState();
118+
119+
/** Error state such as the count for consecutive errors. */
120+
interface ErrorState {
121+
/**
122+
* If it is an error, increment the count; otherwise, reset the count to 0.
123+
*
124+
* @return the updated error count.
125+
*/
126+
int updateErrorCount(boolean isError);
127+
128+
/**
129+
* Each error count is returned only once.
130+
* The subsequent calls for the same error count, it returns 0.
131+
* <p>
132+
* For example,
133+
* 1. Error count is 3
134+
* 2. Calling getErrorCountToDelay() returns 3
135+
* 3. Calling getErrorCountToDelay() again returns 0
136+
* 4. Error count is incremented to 4
137+
* 5. Calling getErrorCountToDelay() returns 4
138+
* 6. Calling getErrorCountToDelay() again returns 0
139+
*
140+
* @return each error count only once.
141+
*/
142+
int getErrorCountToDelay();
143+
}
115144
}

ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class FollowerInfoImpl implements FollowerInfo {
4242
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX);
4343
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
4444
private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
45+
private final ErrorState errorState = new ErrorStateImpl();
4546
private volatile boolean caughtUp;
4647
private volatile boolean ackInstallSnapshotAttempt = false;
4748

@@ -240,4 +241,35 @@ public Timestamp getLastRespondedAppendEntriesSendTime() {
240241
public void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime) {
241242
lastRespondedAppendEntriesSendTime.set(sendTime);
242243
}
244+
245+
@Override
246+
public ErrorState getErrorState() {
247+
return errorState;
248+
}
249+
250+
static class ErrorStateImpl implements ErrorState {
251+
/** The number of consecutive errors without getting a successful reply for a particular follower. */
252+
private int errorCount = 0;
253+
private int lastReturnedErrorCount = 0;
254+
255+
@Override
256+
public synchronized int updateErrorCount(boolean isError) {
257+
if (isError) {
258+
errorCount++;
259+
} else {
260+
errorCount = 0;
261+
lastReturnedErrorCount = 0;
262+
}
263+
return errorCount;
264+
}
265+
266+
@Override
267+
public synchronized int getErrorCountToDelay() {
268+
if (errorCount == lastReturnedErrorCount) {
269+
return 0;
270+
}
271+
lastReturnedErrorCount = errorCount;
272+
return errorCount;
273+
}
274+
}
243275
}

ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ private void runImpl() {
158158
}
159159
synchronized (server) {
160160
if (roleChangeChecking(electionTimeout)) {
161-
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
162-
this, lastRpcTime.elapsedTime(), electionTimeout);
161+
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}ms, electionTimeout:{}",
162+
this, lastRpcTime.elapsedTimeMs(), electionTimeout);
163163
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
164164
// election timeout, should become a candidate
165165
server.changeToCandidate(false);

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
import static org.apache.ratis.server.impl.ServerProtoUtils.toReadIndexRequestProto;
151151
import static org.apache.ratis.server.impl.ServerProtoUtils.toRequestVoteReplyProto;
152152
import static org.apache.ratis.server.impl.ServerProtoUtils.toStartLeaderElectionReplyProto;
153+
import static org.apache.ratis.server.raftlog.LogProtoUtils.toLogEntryTermIndexString;
153154
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString;
154155
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString;
155156
import static org.apache.ratis.server.util.ServerStringUtils.toRequestVoteReplyString;
@@ -1703,6 +1704,11 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
17031704
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
17041705
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
17051706
return appendFuture.whenCompleteAsync((r, t) -> {
1707+
if (t != null) {
1708+
LOG.warn("{}: appendEntries* failed: {}", getMemberId(), toLogEntryTermIndexString(entries), t);
1709+
} else if (LOG.isDebugEnabled()) {
1710+
LOG.debug("{}: appendEntries* succeeded {}", getMemberId(), toLogEntryTermIndexString(entries));
1711+
}
17061712
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
17071713
timer.stop();
17081714
}, getServerExecutor()).thenApply(v -> {
@@ -1753,7 +1759,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryPro
17531759
&& !(appendLogTermIndices != null && appendLogTermIndices.contains(previous))
17541760
&& !state.containsTermIndex(previous)) {
17551761
final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex());
1756-
LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous);
1762+
LOG.info("{}: Failed appendEntries as previous log entry {} is not found", getMemberId(), previous);
17571763
return replyNextIndex;
17581764
}
17591765

ratis-server/src/main/java/org/apache/ratis/server/raftlog/LogProtoUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,18 @@ public static String toLogEntriesShortString(List<LogEntryProto> entries,
9292
: "size=" + entries.size() + ", first=" + toLogEntryString(entries.get(0), stateMachineToString);
9393
}
9494

95+
public static String toLogEntryTermIndexString(List<LogEntryProto> entries) {
96+
final int n = entries.size();
97+
return n == 0 ? toLogEntryTermIndexString(n, null, null)
98+
: toLogEntryTermIndexString(n, TermIndex.valueOf(entries.get(0)), TermIndex.valueOf(entries.get(n - 1)));
99+
}
100+
101+
public static String toLogEntryTermIndexString(int n, TermIndex first, TermIndex last) {
102+
return n == 0 ? "HEARTBEAT"
103+
: n == 1 ? "entry" + first
104+
: n + " entries:" + first + "..." + last;
105+
}
106+
95107
public static LogEntryProto toLogEntryProto(RaftConfiguration conf, Long term, long index) {
96108
final LogEntryProto.Builder b = LogEntryProto.newBuilder();
97109
Optional.ofNullable(term).ifPresent(b::setTerm);

0 commit comments

Comments
 (0)