Skip to content

Commit dec03c4

Browse files
committed
RATIS-2294. Fix NettyClientRpc exception and timeout handling (apache#1264)
(cherry picked from commit 8634542)
1 parent f33dc62 commit dec03c4

File tree

3 files changed

+40
-50
lines changed

3 files changed

+40
-50
lines changed

ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java

Lines changed: 24 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,6 @@
2929
import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
3030
import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
3131
import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto;
32-
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
33-
import org.apache.ratis.protocol.exceptions.NotLeaderException;
3432
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
3533
import org.apache.ratis.util.JavaUtils;
3634
import org.apache.ratis.util.TimeDuration;
@@ -40,29 +38,26 @@
4038

4139
import java.io.IOException;
4240
import java.util.concurrent.CompletableFuture;
43-
import java.util.function.Supplier;
4441

4542
public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
4643

4744
public static final Logger LOG = LoggerFactory.getLogger(NettyClientRpc.class);
4845

49-
private Supplier<String> name;
50-
private ClientId cId;
51-
private final TimeDuration requestTimeoutDuration;
46+
private ClientId clientId;
47+
private final TimeDuration requestTimeout;
5248
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
5349

5450
public NettyClientRpc(ClientId clientId, RaftProperties properties) {
5551
super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
56-
this.cId = clientId;
57-
this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
52+
this.clientId = clientId;
53+
this.requestTimeout = RaftClientConfigKeys.Rpc.requestTimeout(properties);
5854
}
5955

6056
@Override
6157
public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest request) {
6258
final RaftPeerId serverId = request.getServerId();
6359
long callId = request.getCallId();
6460
try {
65-
name = JavaUtils.memoize(() -> cId + "->" + serverId);
6661
final NettyRpcProxy proxy = getProxies().getProxy(serverId);
6762
final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request);
6863
final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
@@ -75,51 +70,40 @@ public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest req
7570
} else {
7671
return ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
7772
}
78-
}).thenCompose(raftReply -> {
79-
final NotLeaderException nle = raftReply.getNotLeaderException();
80-
if (nle != null) {
81-
return failedFuture(nle);
82-
}
83-
final LeaderNotReadyException lnre = raftReply.getLeaderNotReadyException();
84-
if (lnre != null) {
85-
return failedFuture(lnre);
73+
}).whenComplete((reply, e) -> {
74+
if (e == null) {
75+
if (reply == null) {
76+
e = new NullPointerException("Both reply==null && e==null");
77+
}
78+
if (e == null) {
79+
e = reply.getNotLeaderException();
80+
}
81+
if (e == null) {
82+
e = reply.getLeaderNotReadyException();
83+
}
8684
}
87-
return CompletableFuture.completedFuture(raftReply);
88-
}).whenComplete((raftReply, ex) -> {
89-
if (ex != null) {
90-
replyFuture.completeExceptionally(ex);
85+
86+
if (e != null) {
87+
replyFuture.completeExceptionally(e);
9188
} else {
92-
replyFuture.complete(raftReply);
89+
replyFuture.complete(reply);
9390
}
9491
});
9592

96-
scheduler.onTimeout(requestTimeoutDuration,
97-
() -> {
93+
scheduler.onTimeout(requestTimeout, () -> {
9894
if (!replyFuture.isDone()) {
99-
TimeoutIOException timeout = new TimeoutIOException(
100-
getName()+ " request #" + callId + " timeout " +
101-
requestTimeoutDuration.getDuration());
102-
replyFuture.completeExceptionally(timeout);
95+
final String s = clientId + "->" + serverId + " request #" +
96+
callId + " timeout " + requestTimeout.getDuration();
97+
replyFuture.completeExceptionally(new TimeoutIOException(s));
10398
}
104-
}, LOG, () -> "Timeout check for client request #" + callId
105-
);
99+
}, LOG, () -> "Timeout check for client request #" + callId);
106100

107101
return replyFuture;
108102
} catch (Throwable e) {
109103
return JavaUtils.completeExceptionally(e);
110104
}
111105
}
112106

113-
private String getName() {
114-
return name.get();
115-
}
116-
117-
private <T> CompletableFuture<T> failedFuture(Throwable ex) {
118-
CompletableFuture<T> future = new CompletableFuture<>();
119-
future.completeExceptionally(ex);
120-
return future;
121-
}
122-
123107
@Override
124108
public RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
125109
final RaftPeerId serverId = request.getServerId();

ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
4848
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
4949
import org.apache.ratis.util.JavaUtils;
50+
import org.apache.ratis.util.PlatformUtils;
5051
import org.apache.ratis.util.Slf4jUtils;
5152
import org.apache.ratis.util.TimeDuration;
5253
import org.apache.ratis.util.function.CheckedRunnable;
@@ -85,6 +86,10 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
8586
{
8687
getProperties().setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
8788
SimpleStateMachine4Testing.class, StateMachine.class);
89+
if (!PlatformUtils.LINUX) {
90+
getProperties().setBoolean("raft.netty.server.use-epoll", false);
91+
getProperties().setBoolean("raft.netty.client.use-epoll", false);
92+
}
8893
}
8994

9095
@Test
@@ -110,8 +115,8 @@ public void testAsyncConfiguration() throws IOException {
110115

111116
static void assertRaftRetryFailureException(RaftRetryFailureException rfe, RetryPolicy retryPolicy, String name) {
112117
Assertions.assertNotNull(rfe, name + " does not have RaftRetryFailureException");
113-
Assertions.assertTrue(
114-
rfe.getMessage().contains(retryPolicy.toString()), name + ": unexpected error message, rfe=" + rfe + ", retryPolicy=" + retryPolicy);
118+
Assertions.assertTrue(rfe.getMessage().contains(retryPolicy.toString()),
119+
name + ": unexpected error message, rfe=" + rfe + ", retryPolicy=" + retryPolicy);
115120
}
116121

117122
@Test
@@ -284,8 +289,8 @@ public void testStaleReadAsync() throws Exception {
284289

285290
void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
286291
final int numMessages = 10;
287-
try (RaftClient client = cluster.createClient()) {
288-
RaftTestUtil.waitForLeader(cluster);
292+
RaftServer.Division division = waitForLeader(cluster);
293+
try (RaftClient client = cluster.createClient(division.getId())) {
289294

290295
// submit some messages
291296
final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
@@ -306,6 +311,7 @@ void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
306311
// Use a follower with the max commit index
307312
final RaftClientReply lastWriteReply = replies.get(replies.size() - 1);
308313
final RaftPeerId leader = lastWriteReply.getServerId();
314+
Assertions.assertEquals(leader, lastWriteReply.getServerId());
309315
LOG.info("leader = " + leader);
310316
final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos();
311317
LOG.info("commitInfos = " + commitInfos);
@@ -368,8 +374,8 @@ public void testWriteAsyncCustomReplicationLevel() throws Exception {
368374

369375
void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws Exception {
370376
final int numMessages = 20;
371-
try (RaftClient client = cluster.createClient()) {
372-
RaftTestUtil.waitForLeader(cluster);
377+
final RaftPeerId leader = waitForLeader(cluster).getId();
378+
try (RaftClient client = cluster.createClient(leader)) {
373379

374380
// submit some messages
375381
for (int i = 0; i < numMessages; i++) {
@@ -419,13 +425,13 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
419425
LOG.info("Running testAppendEntriesTimeout");
420426
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
421427
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS));
422-
RaftPeerId id = waitForLeader(cluster).getId();
428+
final RaftPeerId leader = waitForLeader(cluster).getId();
423429
long time = System.currentTimeMillis();
424430
long waitTime = 5000;
425431
try (final RaftClient client = cluster.createClient()) {
426432
// block append requests
427433
cluster.getServerAliveStream()
428-
.filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId() != id)
434+
.filter(impl -> !impl.getInfo().isLeader() && !impl.getPeer().getId().equals(leader))
429435
.map(SimpleStateMachine4Testing::get)
430436
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
431437

@@ -435,7 +441,7 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
435441
Assertions.assertFalse(replyFuture.isDone());
436442
// unblock append request.
437443
cluster.getServerAliveStream()
438-
.filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId() != id)
444+
.filter(impl -> !impl.getInfo().isLeader() && !impl.getPeer().getId().equals(leader))
439445
.map(SimpleStateMachine4Testing::get)
440446
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
441447

ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ public void testStateMachineMetrics() throws Exception {
457457

458458
static void runTestStateMachineMetrics(boolean async, MiniRaftCluster cluster) throws Exception {
459459
RaftServer.Division leader = waitForLeader(cluster);
460-
try (final RaftClient client = cluster.createClient()) {
460+
try (final RaftClient client = cluster.createClient(leader.getId())) {
461461
Gauge appliedIndexGauge = getStatemachineGaugeWithName(leader,
462462
STATEMACHINE_APPLIED_INDEX_GAUGE);
463463
Gauge smAppliedIndexGauge = getStatemachineGaugeWithName(leader,

0 commit comments

Comments
 (0)