Skip to content

Commit f54b318

Browse files
committed
RATIS-2506. Delay consecutive GrpcLogAppender restart.
1 parent f141e10 commit f54b318

7 files changed

Lines changed: 97 additions & 26 deletions

File tree

ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ static Throwable unwrapThrowable(Throwable t) {
9797
return unwrapped;
9898
}
9999
}
100-
return t;
100+
return JavaUtils.unwrapCompletionException(t);
101101
}
102102

103103
static IOException unwrapException(StatusRuntimeException se) {

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
import java.util.concurrent.TimeUnit;
6464
import java.util.concurrent.atomic.AtomicLong;
6565

66+
import static org.apache.ratis.server.util.ServerStringUtils.toLogEntryString;
67+
6668
/**
6769
* A new log appender implementation using grpc bi-directional stream API.
6870
*/
@@ -127,31 +129,21 @@ boolean isError() {
127129
}
128130
}
129131

130-
static class ReplyState {
132+
class ReplyState {
131133
private boolean firstReplyReceived = false;
132-
private int errorCount = 0;
133134

134135
synchronized boolean isFirstReplyReceived() {
135136
return firstReplyReceived;
136137
}
137138

138-
synchronized int getErrorCount() {
139-
return errorCount;
140-
}
141-
142139
int process(AppendResult result) {
143140
return process(result == AppendResult.INCONSISTENCY? Event.APPEND_ENTRIES_INCONSISTENCY_REPLY
144141
: Event.APPEND_ENTRIES_REPLY);
145142
}
146143

147144
synchronized int process(Event event) {
148145
firstReplyReceived = event.updateFirstReplyReceived(firstReplyReceived);
149-
if (event.isError()) {
150-
errorCount++;
151-
} else {
152-
errorCount = 0;
153-
}
154-
return errorCount;
146+
return getFollower().getErrorState().updateErrorCount(event.isError());
155147
}
156148
}
157149

@@ -298,17 +290,28 @@ private boolean isSlowFollower() {
298290
private void mayWait() {
299291
// use lastSend time instead of lastResponse time
300292
try {
301-
getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(),
302-
TimeUnit.MILLISECONDS);
293+
sleepForError();
294+
getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
303295
} catch (InterruptedException ie) {
304296
LOG.warn(this + ": Wait interrupted by " + ie);
305297
Thread.currentThread().interrupt();
306298
}
307299
}
308300

309-
private long errorWaitTimeMs() {
310-
return errorRetryWaitPolicy.handleAttemptFailure(replyState::getErrorCount)
311-
.getSleepTime().toLong(TimeUnit.MILLISECONDS);
301+
void sleepForError() {
302+
final int errorCount = getFollower().getErrorState().getErrorCountToDelay();
303+
if (errorCount < 1) {
304+
return;
305+
}
306+
307+
final TimeDuration sleepTime = errorRetryWaitPolicy.handleAttemptFailure(() -> errorCount).getSleepTime();
308+
try {
309+
LOG.debug("{}: sleepForError {}, errorCount={}", this, sleepTime, errorCount);
310+
sleepTime.sleep();
311+
} catch (InterruptedException e) {
312+
Thread.currentThread().interrupt();
313+
LOG.warn("{}: Interrupted sleepForError {}, errorCount={}", this, sleepTime, errorCount);
314+
}
312315
}
313316

314317
@Override
@@ -889,13 +892,9 @@ boolean isHeartbeat() {
889892

890893
@Override
891894
public String toString() {
892-
final String entries = entriesCount == 0? ""
893-
: entriesCount == 1? ",entry=" + firstEntry
894-
: ",entries=" + firstEntry + "..." + lastEntry;
895895
return JavaUtils.getClassSimpleName(getClass())
896896
+ ":cid=" + callId
897-
+ ",entriesCount=" + entriesCount
898-
+ entries;
897+
+ "," + toLogEntryString(entriesCount, firstEntry, lastEntry);
899898
}
900899
}
901900

ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,25 @@ public interface FollowerInfo {
112112

113113
/** Update lastRpcResponseTime and LastRespondedAppendEntriesSendTime */
114114
void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime);
115+
116+
/** @return the error state. */
117+
ErrorState getErrorState();
118+
119+
/** Error state such as the count for consecutive errors. */
120+
interface ErrorState {
121+
/**
122+
* If it is an error, increment the count; otherwise, reset the count to 0.
123+
*
124+
* @return the updated count.
125+
*/
126+
int updateErrorCount(boolean isError);
127+
128+
/**
129+
* For each error count, it should delay only once.
130+
*
131+
* @return each error count only once in order to trigger a delay.
132+
* The subsequent calls for the same error count, it returns 0.
133+
*/
134+
int getErrorCountToDelay();
135+
}
115136
}

ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class FollowerInfoImpl implements FollowerInfo {
4242
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX);
4343
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
4444
private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
45+
private final ErrorState errorState = new ErrorStateImpl();
4546
private volatile boolean caughtUp;
4647
private volatile boolean ackInstallSnapshotAttempt = false;
4748

@@ -240,4 +241,35 @@ public Timestamp getLastRespondedAppendEntriesSendTime() {
240241
public void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime) {
241242
lastRespondedAppendEntriesSendTime.set(sendTime);
242243
}
244+
245+
@Override
246+
public ErrorState getErrorState() {
247+
return errorState;
248+
}
249+
250+
static class ErrorStateImpl implements ErrorState {
251+
/** The number of consecutive errors without getting a successful reply for this follower. */
252+
private int errorCount = 0;
253+
private int lastReturnedErrorCount = 0;
254+
255+
@Override
256+
public synchronized int updateErrorCount(boolean isError) {
257+
if (isError) {
258+
errorCount++;
259+
} else {
260+
errorCount = 0;
261+
lastReturnedErrorCount = 0;
262+
}
263+
return errorCount;
264+
}
265+
266+
@Override
267+
public synchronized int getErrorCountToDelay() {
268+
if (errorCount == lastReturnedErrorCount) {
269+
return 0;
270+
}
271+
lastReturnedErrorCount = errorCount;
272+
return errorCount;
273+
}
274+
}
243275
}

ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ private void runImpl() {
158158
}
159159
synchronized (server) {
160160
if (roleChangeChecking(electionTimeout)) {
161-
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
162-
this, lastRpcTime.elapsedTime(), electionTimeout);
161+
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}ms, electionTimeout:{}",
162+
this, lastRpcTime.elapsedTimeMs(), electionTimeout);
163163
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
164164
// election timeout, should become a candidate
165165
server.changeToCandidate(false);

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@
151151
import static org.apache.ratis.server.impl.ServerProtoUtils.toStartLeaderElectionReplyProto;
152152
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesReplyString;
153153
import static org.apache.ratis.server.util.ServerStringUtils.toAppendEntriesRequestString;
154+
import static org.apache.ratis.server.util.ServerStringUtils.toLogEntryString;
154155
import static org.apache.ratis.server.util.ServerStringUtils.toRequestVoteReplyString;
155156

156157
class RaftServerImpl implements RaftServer.Division,
@@ -1698,6 +1699,12 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16981699
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
16991700
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
17001701
return appendFuture.whenCompleteAsync((r, t) -> {
1702+
if (t != null) {
1703+
LOG.warn("{}: appendEntries* failed: {}", getMemberId(), toLogEntryString(entries), t);
1704+
} else if (LOG.isDebugEnabled()) {
1705+
LOG.debug("{}: appendEntries* succeeded {}", getMemberId(), toLogEntryString(entries));
1706+
}
1707+
17011708
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
17021709
timer.stop();
17031710
}, getServerExecutor()).thenApply(v -> {
@@ -1747,7 +1754,7 @@ private long checkInconsistentAppendEntries(TermIndex previous, List<LogEntryPro
17471754
&& !(appendLogTermIndices != null && appendLogTermIndices.contains(previous))
17481755
&& !state.containsTermIndex(previous)) {
17491756
final long replyNextIndex = Math.min(state.getNextIndex(), previous.getIndex());
1750-
LOG.info("{}: Failed appendEntries as previous log entry ({}) is not found", getMemberId(), previous);
1757+
LOG.info("{}: Failed appendEntries as previous log entry {} is not found", getMemberId(), previous);
17511758
return replyNextIndex;
17521759
}
17531760

ratis-server/src/main/java/org/apache/ratis/server/util/ServerStringUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,18 @@
4545
public final class ServerStringUtils {
4646
private ServerStringUtils() {}
4747

48+
public static String toLogEntryString(List<LogEntryProto> entries) {
49+
final int n = entries.size();
50+
return n == 0 ? toLogEntryString(n, null, null)
51+
: toLogEntryString(n, TermIndex.valueOf(entries.get(0)), TermIndex.valueOf(entries.get(n - 1)));
52+
}
53+
54+
public static String toLogEntryString(int n, TermIndex first, TermIndex last) {
55+
return n == 0 ? "HEARTBEAT"
56+
: n == 1 ? "entry" + first
57+
: n + " entries:" + first + "..." + last;
58+
}
59+
4860
public static String toAppendEntriesRequestString(AppendEntriesRequestProto request,
4961
Function<StateMachineLogEntryProto, String> stateMachineToString) {
5062
if (request == null) {

0 commit comments

Comments
 (0)