Skip to content

Commit f33dc62

Browse files
committed
RATIS-2294. Add Timeout.
1 parent 0fe1bc2 commit f33dc62

File tree

2 files changed

+68
-4
lines changed

2 files changed

+68
-4
lines changed

ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.ratis.netty.client;
1919

20+
import org.apache.ratis.client.RaftClientConfigKeys;
2021
import org.apache.ratis.client.impl.ClientProtoUtils;
2122
import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
2223
import org.apache.ratis.conf.RaftProperties;
@@ -28,36 +29,97 @@
2829
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
2930
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
3031
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
32+
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
33+
import org.apache.ratis.protocol.exceptions.NotLeaderException;
34+
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
3135
import org.apache.ratis.util.JavaUtils;
36+
import org.apache.ratis.util.TimeDuration;
37+
import org.apache.ratis.util.TimeoutExecutor;
38+
import org.slf4j.Logger;
39+
import org.slf4j.LoggerFactory;
3240

3341
import java.io.IOException;
3442
import java.util.concurrent.CompletableFuture;
43+
import java.util.function.Supplier;
3544

3645
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
46+
47+
public static final Logger LOG = LoggerFactory.getLogger(NettyClientRpc.class);
48+
49+
private Supplier<String> name;
50+
private ClientId cId;
51+
private final TimeDuration requestTimeoutDuration;
52+
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
53+
3754
public NettyClientRpc(ClientId clientId, RaftProperties properties) {
3855
super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
56+
this.cId = clientId;
57+
this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
3958
}
4059

4160
@Override
4261
public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest request) {
4362
final RaftPeerId serverId = request.getServerId();
63+
long callId = request.getCallId();
4464
try {
65+
name = JavaUtils.memoize(() -> cId + "->" + serverId);
4566
final NettyRpcProxy proxy = getProxies().getProxy(serverId);
4667
final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request);
47-
return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
68+
final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
69+
70+
proxy.sendAsync(serverRequestProto).thenApply(replyProto -> {
4871
if (request instanceof GroupListRequest) {
4972
return ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply());
5073
} else if (request instanceof GroupInfoRequest) {
5174
return ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply());
5275
} else {
5376
return ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
5477
}
78+
}).thenCompose(raftReply -> {
79+
final NotLeaderException nle = raftReply.getNotLeaderException();
80+
if (nle != null) {
81+
return failedFuture(nle);
82+
}
83+
final LeaderNotReadyException lnre = raftReply.getLeaderNotReadyException();
84+
if (lnre != null) {
85+
return failedFuture(lnre);
86+
}
87+
return CompletableFuture.completedFuture(raftReply);
88+
}).whenComplete((raftReply, ex) -> {
89+
if (ex != null) {
90+
replyFuture.completeExceptionally(ex);
91+
} else {
92+
replyFuture.complete(raftReply);
93+
}
5594
});
95+
96+
scheduler.onTimeout(requestTimeoutDuration,
97+
() -> {
98+
if (!replyFuture.isDone()) {
99+
TimeoutIOException timeout = new TimeoutIOException(
100+
getName()+ " request #" + callId + " timeout " +
101+
requestTimeoutDuration.getDuration());
102+
replyFuture.completeExceptionally(timeout);
103+
}
104+
}, LOG, () -> "Timeout check for client request #" + callId
105+
);
106+
107+
return replyFuture;
56108
} catch (Throwable e) {
57109
return JavaUtils.completeExceptionally(e);
58110
}
59111
}
60112

113+
private String getName() {
114+
return name.get();
115+
}
116+
117+
private <T> CompletableFuture<T> failedFuture(Throwable ex) {
118+
CompletableFuture<T> future = new CompletableFuture<>();
119+
future.completeExceptionally(ex);
120+
return future;
121+
}
122+
61123
@Override
62124
public RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
63125
final RaftPeerId serverId = request.getServerId();

ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.apache.ratis.util.function.CheckedRunnable;
5353
import org.junit.jupiter.api.Assertions;
5454
import org.junit.jupiter.api.Test;
55+
import org.junit.jupiter.api.Timeout;
5556
import org.slf4j.event.Level;
5657

5758
import java.io.IOException;
@@ -69,6 +70,7 @@
6970

7071
import static org.apache.ratis.RaftTestUtil.waitForLeader;
7172

73+
@Timeout(100)
7274
public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends BaseTest
7375
implements MiniRaftCluster.Factory.Get<CLUSTER> {
7476
{
@@ -417,13 +419,13 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
417419
LOG.info("Running testAppendEntriesTimeout");
418420
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
419421
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS));
420-
waitForLeader(cluster);
422+
RaftPeerId id = waitForLeader(cluster).getId();
421423
long time = System.currentTimeMillis();
422424
long waitTime = 5000;
423425
try (final RaftClient client = cluster.createClient()) {
424426
// block append requests
425427
cluster.getServerAliveStream()
426-
.filter(impl -> !impl.getInfo().isLeader())
428+
.filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId() != id)
427429
.map(SimpleStateMachine4Testing::get)
428430
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
429431

@@ -433,7 +435,7 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
433435
Assertions.assertFalse(replyFuture.isDone());
434436
// unblock append request.
435437
cluster.getServerAliveStream()
436-
.filter(impl -> !impl.getInfo().isLeader())
438+
.filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId() != id)
437439
.map(SimpleStateMachine4Testing::get)
438440
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
439441

0 commit comments

Comments
 (0)