Skip to content

Commit 62636d2

Browse files
committed
testWriteStateMachineDataFailure
1 parent f141e10 commit 62636d2

10 files changed

Lines changed: 78 additions & 8 deletions

File tree

ratis-common/src/main/java/org/apache/ratis/util/LogUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ static void warn(Logger log, Supplier<String> message, Throwable t, Class<?>...
126126
}
127127
log.warn(b.toString());
128128
} else {
129-
log.warn(message.get(), t);
129+
log.warn(message.get() + ": " + t);
130130
}
131131
}
132132
}

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ CompletableFuture<Integer> submitCommit(
276276
final WriteInfo info = writeInfos.get(index);
277277
if (info == null) {
278278
return JavaUtils.completeExceptionally(
279-
new IOException(name.get() + " is already committed."));
279+
new IOException(name.get() + " not found."));
280280
}
281281

282282
final CheckedSupplier<Integer, IOException> task = LogUtils.newCheckedSupplier(LOG, () -> {

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreClient.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,15 @@ private static <OUTPUT, THROWABLE extends Throwable> OUTPUT watchImpl(
171171

172172
public long write(String path, long offset, boolean close, ByteBuffer buffer, boolean sync)
173173
throws IOException {
174-
final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining());
175-
buffer.limit(chunkSize);
176-
final ByteString reply = writeImpl(this::send, path, offset, close, buffer, sync);
177-
return WriteReplyProto.parseFrom(reply).getLength();
174+
try {
175+
final int chunkSize = FileStoreCommon.getChunkSize(buffer.remaining());
176+
buffer.limit(chunkSize);
177+
final ByteString reply = writeImpl(this::send, path, offset, close, buffer, sync);
178+
return WriteReplyProto.parseFrom(reply).getLength();
179+
} catch (IOException e) {
180+
LOG.error("XXX Write failed for {} at offset {}, close? {}, sync? {}", path, offset, close, sync, e);
181+
throw e;
182+
}
178183
}
179184

180185
public DataStreamOutput getStreamOutput(String path, long dataSize, RoutingTable routingTable) {

ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.ratis.protocol.Message;
3232
import org.apache.ratis.protocol.RaftClientRequest;
3333
import org.apache.ratis.protocol.RaftGroupId;
34+
import org.apache.ratis.protocol.RaftPeerId;
3435
import org.apache.ratis.server.RaftServer;
3536
import org.apache.ratis.server.storage.RaftStorage;
3637
import org.apache.ratis.statemachine.StateMachineStorage;
@@ -44,6 +45,7 @@
4445
import java.io.IOException;
4546
import java.nio.file.Path;
4647
import java.util.concurrent.CompletableFuture;
48+
import java.util.concurrent.atomic.AtomicReference;
4749

4850
public class FileStoreStateMachine extends BaseStateMachine {
4951
private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage();
@@ -120,6 +122,8 @@ public TransactionContext startTransaction(LogEntryProto entry, RaftProtos.RaftP
120122
.build();
121123
}
122124

125+
static final AtomicReference<RaftPeerId> chosenFollower = new AtomicReference<>();
126+
123127
@Override
124128
public CompletableFuture<Integer> write(LogEntryProto entry, TransactionContext context) {
125129
final FileStoreRequestProto proto = getProto(context, entry);
@@ -128,6 +132,27 @@ public CompletableFuture<Integer> write(LogEntryProto entry, TransactionContext
128132
}
129133

130134
final WriteRequestHeaderProto h = proto.getWriteHeader();
135+
final String path = h.getPath().toStringUtf8();
136+
try {
137+
final RaftServer.Division division = getServer().join().getDivision(getGroupId());
138+
final RaftPeerId serverId = division.getId();
139+
final boolean isFollower = path.equals("foo") && division.getInfo().isFollower();
140+
if (isFollower) {
141+
final boolean shouldFail;
142+
if (chosenFollower.compareAndSet(null, serverId)) {
143+
LOG.info("XXX follower {} is chosen", serverId);
144+
shouldFail = true;
145+
} else {
146+
shouldFail = serverId.equals(chosenFollower.get());
147+
}
148+
if (shouldFail) {
149+
return FileStoreCommon.completeExceptionally(entry.getIndex(), getId() + ": XXX: Failed follower for " + path);
150+
}
151+
}
152+
} catch (IOException e) {
153+
LOG.error("XXX Failed to check follower {}", path, e);
154+
}
155+
131156
final CompletableFuture<Integer> f = files.write(entry.getIndex(),
132157
h.getPath().toStringUtf8(), h.getClose(), h.getSync(), h.getOffset(),
133158
entry.getStateMachineLogEntry().getStateMachineEntry().getStateMachineData());

ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreBaseTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,24 @@ public void testFileStore() throws Exception {
144144
cluster.shutdown();
145145
}
146146

147+
@Test
148+
public void testWriteStateMachineDataFailure() throws Exception {
149+
final CLUSTER cluster = newCluster(NUM_PEERS);
150+
cluster.start();
151+
RaftTestUtil.waitForLeader(cluster);
152+
153+
final CheckedSupplier<FileStoreClient, IOException> newClient = () -> newFileStoreClient(cluster);
154+
155+
try {
156+
testSingleFile("foo", SizeInBytes.valueOf("1m"), newClient);
157+
} catch (Exception e) {
158+
LOG.info("XXX Failed: foo", e);
159+
}
160+
161+
testSingleFile("bar", SizeInBytes.valueOf("2k"), newClient);
162+
cluster.shutdown();
163+
}
164+
147165
private static FileStoreWriter writeSingleFile(
148166
String path, SizeInBytes fileLength, CheckedSupplier<FileStoreClient, IOException> newClient)
149167
throws Exception {

ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,8 @@ FileStoreWriter verify() throws IOException {
221221
verify(read, offset, n, expected);
222222
offset += n;
223223
}
224+
225+
LOG.info("XXX Verify successful: {}", fileName);
224226
return this;
225227
}
226228

ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ static Throwable unwrapThrowable(Throwable t) {
9797
return unwrapped;
9898
}
9999
}
100-
return t;
100+
return JavaUtils.unwrapCompletionException(t);
101101
}
102102

103103
static IOException unwrapException(StatusRuntimeException se) {

ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import java.util.concurrent.ConcurrentHashMap;
6262
import java.util.concurrent.ExecutionException;
6363
import java.util.concurrent.TimeUnit;
64+
import java.util.concurrent.atomic.AtomicBoolean;
6465
import java.util.concurrent.atomic.AtomicLong;
6566

6667
/**
@@ -168,6 +169,7 @@ synchronized int process(Event event) {
168169
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
169170
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
170171
private volatile StreamObservers appendLogRequestObserver;
172+
private final AtomicBoolean alreadyReset = new AtomicBoolean(false);
171173
private final boolean useSeparateHBChannel;
172174

173175
private final GrpcServerMetrics grpcServerMetrics;
@@ -212,6 +214,16 @@ private GrpcServerProtocolClient getClient() throws IOException {
212214
}
213215

214216
private void resetClient(AppendEntriesRequest request, Event event) {
217+
if (!alreadyReset.compareAndSet(false, true)) {
218+
try {
219+
final TimeDuration sleep = TimeDuration.ONE_SECOND;
220+
LOG.warn("XXX {}: already reset client, sleep {}", this, sleep);
221+
sleep.sleep();
222+
} catch (InterruptedException e) {
223+
throw new IllegalStateException(e);
224+
}
225+
}
226+
215227
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
216228
getClient().resetConnectBackoff();
217229
if (appendLogRequestObserver != null) {
@@ -511,6 +523,7 @@ private void onNextImpl(AppendEntriesRequest request, AppendEntriesReplyProto re
511523

512524
switch (reply.getResult()) {
513525
case SUCCESS:
526+
alreadyReset.set(false);
514527
grpcServerMetrics.onRequestSuccess(getFollowerId().toString(), reply.getIsHearbeat());
515528
getLeaderState().onFollowerCommitIndex(getFollower(), reply.getFollowerCommit());
516529
if (getFollower().updateMatchIndex(reply.getMatchIndex())) {
@@ -675,6 +688,7 @@ public void onNext(InstallSnapshotReplyProto reply) {
675688
final long followerSnapshotIndex;
676689
switch (reply.getResult()) {
677690
case SUCCESS:
691+
alreadyReset.set(false);
678692
LOG.info("{}: Completed", this);
679693
getFollower().setAttemptedToInstallSnapshot();
680694
removePending(reply);

ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ private void runImpl() {
159159
synchronized (server) {
160160
if (roleChangeChecking(electionTimeout)) {
161161
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
162-
this, lastRpcTime.elapsedTime(), electionTimeout);
162+
this, lastRpcTime.elapsedTime().toString(TimeUnit.MILLISECONDS, 3), electionTimeout);
163163
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
164164
// election timeout, should become a candidate
165165
server.changeToCandidate(false);

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1698,6 +1698,12 @@ leaderId, getMemberId(), currentTerm, followerCommit, inconsistencyReplyNextInde
16981698
final long commitIndex = effectiveCommitIndex(proto.getLeaderCommit(), previous, entries.size());
16991699
final long matchIndex = isHeartbeat? RaftLog.INVALID_LOG_INDEX: entries.get(entries.size() - 1).getIndex();
17001700
return appendFuture.whenCompleteAsync((r, t) -> {
1701+
if (t != null) {
1702+
LOG.error("{}: XXX appendEntries* failed: {}", getMemberId(), t.toString());
1703+
} else if (!entries.isEmpty()) {
1704+
LOG.info("{}: XXX appendEntries* succeeded {} entries: {} -> {}",
1705+
getMemberId(), entries.size(), entries.get(0).getIndex(), entries.get(entries.size() - 1).getIndex());
1706+
}
17011707
followerState.ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.APPEND_COMPLETE));
17021708
timer.stop();
17031709
}, getServerExecutor()).thenApply(v -> {

0 commit comments

Comments
 (0)