Skip to content

Commit 0d464a7

Browse files
committed
RATIS-2294. Improve Some Code.
1 parent f2c90c6 commit 0d464a7

File tree

2 files changed

+30
-42
lines changed

2 files changed

+30
-42
lines changed

ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientRpc.java

Lines changed: 24 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,21 @@ public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> {
4646

4747
public static final Logger LOG = LoggerFactory.getLogger(NettyClientRpc.class);
4848

49-
private Supplier<String> name;
50-
private ClientId cId;
51-
private final TimeDuration requestTimeoutDuration;
49+
private ClientId clientId;
50+
private final TimeDuration requestTimeout;
5251
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
5352

5453
public NettyClientRpc(ClientId clientId, RaftProperties properties) {
5554
super(new NettyRpcProxy.PeerMap(clientId.toString(), properties));
56-
this.cId = clientId;
57-
this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
55+
this.clientId = clientId;
56+
this.requestTimeout = RaftClientConfigKeys.Rpc.requestTimeout(properties);
5857
}
5958

6059
@Override
6160
public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest request) {
6261
final RaftPeerId serverId = request.getServerId();
6362
long callId = request.getCallId();
6463
try {
65-
name = JavaUtils.memoize(() -> cId + "->" + serverId);
6664
final NettyRpcProxy proxy = getProxies().getProxy(serverId);
6765
final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request);
6866
final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
@@ -75,51 +73,40 @@ public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest req
7573
} else {
7674
return ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply());
7775
}
78-
}).thenCompose(raftReply -> {
79-
final NotLeaderException nle = raftReply.getNotLeaderException();
80-
if (nle != null) {
81-
return failedFuture(nle);
82-
}
83-
final LeaderNotReadyException lnre = raftReply.getLeaderNotReadyException();
84-
if (lnre != null) {
85-
return failedFuture(lnre);
76+
}).whenComplete((reply, e) -> {
77+
if (e == null) {
78+
if (reply == null) {
79+
e = new NullPointerException("Both reply==null && e==null");
80+
}
81+
if (e == null) {
82+
e = reply.getNotLeaderException();
83+
}
84+
if (e == null) {
85+
e = reply.getLeaderNotReadyException();
86+
}
8687
}
87-
return CompletableFuture.completedFuture(raftReply);
88-
}).whenComplete((raftReply, ex) -> {
89-
if (ex != null) {
90-
replyFuture.completeExceptionally(ex);
88+
89+
if (e != null) {
90+
replyFuture.completeExceptionally(e);
9191
} else {
92-
replyFuture.complete(raftReply);
92+
replyFuture.complete(reply);
9393
}
9494
});
9595

96-
scheduler.onTimeout(requestTimeoutDuration,
97-
() -> {
96+
scheduler.onTimeout(requestTimeout, () -> {
9897
if (!replyFuture.isDone()) {
99-
TimeoutIOException timeout = new TimeoutIOException(
100-
getName()+ " request #" + callId + " timeout " +
101-
requestTimeoutDuration.getDuration());
102-
replyFuture.completeExceptionally(timeout);
98+
final String s = clientId + "->" + serverId + " request #" +
99+
callId + " timeout " + requestTimeout.getDuration();
100+
replyFuture.completeExceptionally(new TimeoutIOException(s));
103101
}
104-
}, LOG, () -> "Timeout check for client request #" + callId
105-
);
102+
}, LOG, () -> "Timeout check for client request #" + callId);
106103

107104
return replyFuture;
108105
} catch (Throwable e) {
109106
return JavaUtils.completeExceptionally(e);
110107
}
111108
}
112109

113-
private String getName() {
114-
return name.get();
115-
}
116-
117-
private <T> CompletableFuture<T> failedFuture(Throwable ex) {
118-
CompletableFuture<T> future = new CompletableFuture<>();
119-
future.completeExceptionally(ex);
120-
return future;
121-
}
122-
123110
@Override
124111
public RaftClientReply sendRequest(RaftClientRequest request) throws IOException {
125112
final RaftPeerId serverId = request.getServerId();

ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ void runTestStaleReadAsync(CLUSTER cluster) throws Exception {
309309
// Use a follower with the max commit index
310310
final RaftClientReply lastWriteReply = replies.get(replies.size() - 1);
311311
final RaftPeerId leader = lastWriteReply.getServerId();
312+
Assert.assertEquals(leader, lastWriteReply.getServerId());
312313
LOG.info("leader = " + leader);
313314
final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos();
314315
LOG.info("commitInfos = " + commitInfos);
@@ -371,8 +372,8 @@ public void testWriteAsyncCustomReplicationLevel() throws Exception {
371372

372373
void runTestWriteAsyncCustomReplicationLevel(CLUSTER cluster) throws Exception {
373374
final int numMessages = 20;
374-
try (RaftClient client = cluster.createClient()) {
375-
RaftTestUtil.waitForLeader(cluster);
375+
final RaftPeerId leader = waitForLeader(cluster).getId();
376+
try (RaftClient client = cluster.createClient(leader)) {
376377

377378
// submit some messages
378379
for (int i = 0; i < numMessages; i++) {
@@ -422,13 +423,13 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
422423
LOG.info("Running testAppendEntriesTimeout");
423424
final TimeDuration oldExpiryTime = RaftServerConfigKeys.RetryCache.expiryTime(getProperties());
424425
RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(20, TimeUnit.SECONDS));
425-
RaftPeerId id = waitForLeader(cluster).getId();
426+
final RaftPeerId leader = waitForLeader(cluster).getId();
426427
long time = System.currentTimeMillis();
427428
long waitTime = 5000;
428429
try (final RaftClient client = cluster.createClient()) {
429430
// block append requests
430431
cluster.getServerAliveStream()
431-
.filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId() != id)
432+
.filter(impl -> !impl.getInfo().isLeader() && !impl.getPeer().getId().equals(leader))
432433
.map(SimpleStateMachine4Testing::get)
433434
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
434435

@@ -438,7 +439,7 @@ void runTestAppendEntriesTimeout(CLUSTER cluster) throws Exception {
438439
Assert.assertFalse(replyFuture.isDone());
439440
// unblock append request.
440441
cluster.getServerAliveStream()
441-
.filter(impl -> !impl.getInfo().isLeader() && impl.getPeer().getId() != id)
442+
.filter(impl -> !impl.getInfo().isLeader() && !impl.getPeer().getId().equals(leader))
442443
.map(SimpleStateMachine4Testing::get)
443444
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
444445

0 commit comments

Comments
 (0)