Skip to content

Commit 74610fc

Browse files
committed
RATIS-2506. Delay consecutive GrpcLogAppender restart.
1 parent 73655f4 commit 74610fc

8 files changed

Lines changed: 135 additions & 20 deletions

File tree

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ CompletableFuture<Integer> submitCommit(
276276
final WriteInfo info = writeInfos.get(index);
277277
if (info == null) {
278278
return JavaUtils.completeExceptionally(
279-
new IOException(name.get() + " is already committed."));
279+
new IOException(name.get() + " not found."));
280280
}
281281

282282
final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,17 @@ private static <OUTPUT, THROWABLE extends Throwable> OUTPUT watchImpl(
171171

172172
public long write(String path, long offset, boolean close, ByteBuffer buffer, boolean sync)
173173
throws IOException {
174+
LOG.trace("write {}, offset={}, length={}, close? {}", path, offset, buffer.remaining(), close);
175+
try {
176+
return writeImpl(path, offset, close, buffer, sync);
177+
} catch (IOException e) {
178+
LOG.error("Failed to write {}, offset {}, close? {}, sync? {}", path, offset, close, sync, e);
179+
throw e;
180+
}
181+
}
182+
183+
private long writeImpl(String path, long offset, boolean close, ByteBuffer buffer, boolean sync)
184+
throws IOException {
174185
final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining());
175186
buffer.limit(chunkSize);
176187
final ByteString reply = writeImpl(this::send, path, offset, close, buffer, sync);

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.ratis.protocol.Message;
3232
import org.apache.ratis.protocol.RaftClientRequest;
3333
import org.apache.ratis.protocol.RaftGroupId;
34+
import org.apache.ratis.protocol.RaftPeerId;
3435
import org.apache.ratis.server.RaftServer;
3536
import org.apache.ratis.server.storage.RaftStorage;
3637
import org.apache.ratis.statemachine.StateMachineStorage;
@@ -44,8 +45,23 @@
4445
import java.io.IOException;
4546
import java.nio.file.Path;
4647
import java.util.concurrent.CompletableFuture;
48+
import java.util.concurrent.atomic.AtomicReference;
4749

4850
public class FileStoreStateMachine extends BaseStateMachine {
51+
static class SimulateFailure {
52+
static final String PATH_TO_FAIL = "path-to-fail";
53+
private static final AtomicReference<RaftPeerId> CHOSEN = new AtomicReference<>();
54+
55+
static boolean chooseServer(RaftPeerId serverId) {
56+
if (SimulateFailure.CHOSEN.compareAndSet(null, serverId)) {
57+
LOG.info("Server {} is chosen", serverId);
58+
return true;
59+
} else {
60+
return serverId.equals(SimulateFailure.CHOSEN.get());
61+
}
62+
}
63+
}
64+
4965
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
5066

5167
private final FileStore files;
@@ -120,6 +136,15 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftProtos.RaftP
120136
.build();
121137
}
122138

139+
void simulateFailure(String path) throws Exception {
140+
final RaftServer.Division division = getServer().get().getDivision(getGroupId());
141+
if (path.equals(SimulateFailure.PATH_TO_FAIL)
142+
&& division.getInfo().isFollower()
143+
&& SimulateFailure.chooseServer(division.getId())) {
144+
throw new IOException(getId() + ": Simulated failure for path " + path);
145+
}
146+
}
147+
123148
@Override
124149
public CompletableFuture<Integer> write(LogEntryProto entry, TransactionContext context) {
125150
final FileStoreRequestProto proto = getProto(context, entry);
@@ -128,6 +153,12 @@ public CompletableFuture<Integer> write(LogEntryProto entry, TransactionContext
128153
}
129154

130155
final WriteRequestHeaderProto h = proto.getWriteHeader();
156+
try {
157+
simulateFailure(h.getPath().toStringUtf8());
158+
} catch (Exception e) {
159+
return FileStoreCommon.completeExceptionally(entry.getIndex(), getId() + ": Failed simulateFailure", e);
160+
}
161+
131162
final CompletableFuture<Integer> f = files.write(entry.getIndex(),
132163
h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(),
133164
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData());

ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@
4747
import java.util.concurrent.TimeUnit;
4848
import java.util.concurrent.atomic.AtomicBoolean;
4949

50+
import static org.apache.ratis.examples.filestore.FileStoreStateMachine.SimulateFailure.PATH_TO_FAIL;
51+
5052
public abstract class FileStoreBaseTest<CLUSTER extends MiniRaftCluster>
5153
extends BaseTest
5254
implements MiniRaftCluster.Factory.Get<CLUSTER> {
@@ -144,6 +146,18 @@ public void testFileStore() throws Exception {
144146
cluster.shutdown();
145147
}
146148

149+
@Test
150+
public void testWriteStateMachineDataFailure() throws Exception {
151+
final CLUSTER cluster = newCluster(NUM_PEERS);
152+
cluster.start();
153+
RaftTestUtil.waitForLeader(cluster);
154+
155+
final CheckedSupplier<FileStoreClient, IOException> newClient = () -> newFileStoreClient(cluster);
156+
testSingleFile(PATH_TO_FAIL, SizeInBytes.valueOf("100k"), newClient);
157+
testSingleFile("bar", SizeInBytes.valueOf("2k"), newClient);
158+
cluster.shutdown();
159+
}
160+
147161
private static FileStoreWriter writeSingleFile(
148162
String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient, IOException> newClient)
149163
throws Exception {

ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,8 +126,6 @@ FileStoreWriter write(boolean sync) throws IOException {
126126

127127
final ByteBuffer b = randomBytes(length, r);
128128

129-
LOG.trace("write {}, offset={}, length={}, close? {}",
130-
fileName, offset, length, close);
131129
final long written = client.write(fileName, offset, close, b, sync);
132130
Assertions.assertEquals(length, written);
133131
offset += length;
@@ -221,6 +219,8 @@ FileStoreWriter verify() throws IOException {
221219
verify(read, offset, n, expected);
222220
offset += n;
223221
}
222+
223+
LOG.info("XXX Verify successful: {}", fileName);
224224
return this;
225225
}
226226

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -129,31 +129,21 @@ boolean isError() {
129129
}
130130
}
131131

132-
static class ReplyState {
132+
class ReplyState {
133133
private boolean firstReplyReceived = false;
134-
private int errorCount = 0;
135134

136135
synchronized boolean isFirstReplyReceived() {
137136
return firstReplyReceived;
138137
}
139138

140-
synchronized int getErrorCount() {
141-
return errorCount;
142-
}
143-
144139
int process(AppendResult result) {
145140
return process(result == AppendResult.INCONSISTENCY? Event.APPEND_ENTRIES_INCONSISTENCY_REPLY
146141
: Event.APPEND_ENTRIES_REPLY);
147142
}
148143

149144
synchronized int process(Event event) {
150145
firstReplyReceived = event.updateFirstReplyReceived(firstReplyReceived);
151-
if (event.isError()) {
152-
errorCount++;
153-
} else {
154-
errorCount = 0;
155-
}
156-
return errorCount;
146+
return getFollower().getErrorState().updateErrorCount(event.isError());
157147
}
158148
}
159149

@@ -300,17 +290,25 @@ private boolean isSlowFollower() {
300290
private void mayWait() {
301291
// use lastSend time instead of lastResponse time
302292
try {
303-
getEventAwaitForSignal().await(getWaitTimeMs() + errorWaitTimeMs(),
304-
TimeUnit.MILLISECONDS);
293+
// For errors, sleep cannot be waked up by signal
294+
sleepForErrors();
295+
// Normal await can be waked up by signal
296+
getEventAwaitForSignal().await(getWaitTimeMs(), TimeUnit.MILLISECONDS);
305297
} catch (InterruptedException ie) {
306298
Thread.currentThread().interrupt();
307299
LOG.warn("{} is interrupted: {}", this, ie.toString());
308300
}
309301
}
310302

311-
private long errorWaitTimeMs() {
312-
return errorRetryWaitPolicy.handleAttemptFailure(replyState::getErrorCount)
313-
.getSleepTime().toLong(TimeUnit.MILLISECONDS);
303+
private void sleepForErrors() throws InterruptedException {
304+
final int errorCount = getFollower().getErrorState().getErrorCountToDelay();
305+
if (errorCount < 1) {
306+
return;
307+
}
308+
309+
final TimeDuration sleepTime = errorRetryWaitPolicy.handleAttemptFailure(() -> errorCount).getSleepTime();
310+
LOG.debug("{}: sleepForErrors {}, errorCount={}", this, sleepTime, errorCount);
311+
sleepTime.sleep();
314312
}
315313

316314
@Override

ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,33 @@ public interface FollowerInfo {
112112

113113
/** Update lastRpcResponseTime and LastRespondedAppendEntriesSendTime */
114114
void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime);
115+
116+
/** @return the error state. */
117+
ErrorState getErrorState();
118+
119+
/** Error state such as the count for consecutive errors. */
120+
interface ErrorState {
121+
/**
122+
* If it is an error, increment the count; otherwise, reset the count to 0.
123+
*
124+
* @return the updated error count.
125+
*/
126+
int updateErrorCount(boolean isError);
127+
128+
/**
129+
* Each error count is returned only once.
130+
* The subsequent calls for the same error count, it returns 0.
131+
* <p>
132+
* For example,
133+
* 1. Error count is 3
134+
* 2. Calling getErrorCountToDelay() returns 3
135+
* 3. Calling getErrorCountToDelay() again returns 0
136+
* 4. Error count is incremented to 4
137+
* 5. Calling getErrorCountToDelay() returns 4
138+
* 6. Calling getErrorCountToDelay() again returns 0
139+
*
140+
* @return each error count only once.
141+
*/
142+
int getErrorCountToDelay();
143+
}
115144
}

ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class FollowerInfoImpl implements FollowerInfo {
4242
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX);
4343
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
4444
private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
45+
private final ErrorState errorState = new ErrorStateImpl();
4546
private volatile boolean caughtUp;
4647
private volatile boolean ackInstallSnapshotAttempt = false;
4748

@@ -240,4 +241,35 @@ public Timestamp getLastRespondedAppendEntriesSendTime() {
240241
public void updateLastRespondedAppendEntriesSendTime(Timestamp sendTime) {
241242
lastRespondedAppendEntriesSendTime.set(sendTime);
242243
}
244+
245+
@Override
246+
public ErrorState getErrorState() {
247+
return errorState;
248+
}
249+
250+
static class ErrorStateImpl implements ErrorState {
251+
/** The number of consecutive errors without getting a successful reply for a particular follower. */
252+
private int errorCount = 0;
253+
private int lastReturnedErrorCount = 0;
254+
255+
@Override
256+
public synchronized int updateErrorCount(boolean isError) {
257+
if (isError) {
258+
errorCount++;
259+
} else {
260+
errorCount = 0;
261+
lastReturnedErrorCount = 0;
262+
}
263+
return errorCount;
264+
}
265+
266+
@Override
267+
public synchronized int getErrorCountToDelay() {
268+
if (errorCount == lastReturnedErrorCount) {
269+
return 0;
270+
}
271+
lastReturnedErrorCount = errorCount;
272+
return errorCount;
273+
}
274+
}
243275
}

0 commit comments

Comments
 (0)