|
17 | 17 | */ |
18 | 18 | package org.apache.ratis.netty.client; |
19 | 19 |
|
| 20 | +import org.apache.ratis.client.RaftClientConfigKeys; |
20 | 21 | import org.apache.ratis.client.impl.ClientProtoUtils; |
21 | 22 | import org.apache.ratis.client.impl.RaftClientRpcWithProxy; |
22 | 23 | import org.apache.ratis.conf.RaftProperties; |
|
28 | 29 | import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto; |
29 | 30 | import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto; |
30 | 31 | import org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerRequestProto; |
| 32 | +import org.apache.ratis.protocol.exceptions.LeaderNotReadyException; |
| 33 | +import org.apache.ratis.protocol.exceptions.NotLeaderException; |
| 34 | +import org.apache.ratis.protocol.exceptions.TimeoutIOException; |
31 | 35 | import org.apache.ratis.util.JavaUtils; |
| 36 | +import org.apache.ratis.util.TimeDuration; |
| 37 | +import org.apache.ratis.util.TimeoutExecutor; |
| 38 | +import org.slf4j.Logger; |
| 39 | +import org.slf4j.LoggerFactory; |
32 | 40 |
|
33 | 41 | import java.io.IOException; |
34 | 42 | import java.util.concurrent.CompletableFuture; |
| 43 | +import java.util.function.Supplier; |
35 | 44 |
|
36 | 45 | public class NettyClientRpc extends RaftClientRpcWithProxy<NettyRpcProxy> { |
| 46 | + |
| 47 | + public static final Logger LOG = LoggerFactory.getLogger(NettyClientRpc.class); |
| 48 | + |
| 49 | + private Supplier<String> name; |
| 50 | + private ClientId cId; |
| 51 | + private final TimeDuration requestTimeoutDuration; |
| 52 | + private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance(); |
| 53 | + |
37 | 54 | public NettyClientRpc(ClientId clientId, RaftProperties properties) { |
38 | 55 | super(new NettyRpcProxy.PeerMap(clientId.toString(), properties)); |
| 56 | + this.cId = clientId; |
| 57 | + this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties); |
39 | 58 | } |
40 | 59 |
|
41 | 60 | @Override |
42 | 61 | public CompletableFuture<RaftClientReply> sendRequestAsync(RaftClientRequest request) { |
43 | 62 | final RaftPeerId serverId = request.getServerId(); |
| 63 | + long callId = request.getCallId(); |
44 | 64 | try { |
| 65 | + name = JavaUtils.memoize(() -> cId + "->" + serverId); |
45 | 66 | final NettyRpcProxy proxy = getProxies().getProxy(serverId); |
46 | 67 | final RaftNettyServerRequestProto serverRequestProto = buildRequestProto(request); |
47 | | - return proxy.sendAsync(serverRequestProto).thenApply(replyProto -> { |
| 68 | + final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>(); |
| 69 | + |
| 70 | + proxy.sendAsync(serverRequestProto).thenApply(replyProto -> { |
48 | 71 | if (request instanceof GroupListRequest) { |
49 | 72 | return ClientProtoUtils.toGroupListReply(replyProto.getGroupListReply()); |
50 | 73 | } else if (request instanceof GroupInfoRequest) { |
51 | 74 | return ClientProtoUtils.toGroupInfoReply(replyProto.getGroupInfoReply()); |
52 | 75 | } else { |
53 | 76 | return ClientProtoUtils.toRaftClientReply(replyProto.getRaftClientReply()); |
54 | 77 | } |
| 78 | + }).thenCompose(raftReply -> { |
| 79 | + final NotLeaderException nle = raftReply.getNotLeaderException(); |
| 80 | + if (nle != null) { |
| 81 | + return failedFuture(nle); |
| 82 | + } |
| 83 | + final LeaderNotReadyException lnre = raftReply.getLeaderNotReadyException(); |
| 84 | + if (lnre != null) { |
| 85 | + return failedFuture(lnre); |
| 86 | + } |
| 87 | + return CompletableFuture.completedFuture(raftReply); |
| 88 | + }).whenComplete((raftReply, ex) -> { |
| 89 | + if (ex != null) { |
| 90 | + replyFuture.completeExceptionally(ex); |
| 91 | + } else { |
| 92 | + replyFuture.complete(raftReply); |
| 93 | + } |
55 | 94 | }); |
| 95 | + |
| 96 | + scheduler.onTimeout(requestTimeoutDuration, |
| 97 | + () -> { |
| 98 | + if (!replyFuture.isDone()) { |
| 99 | + TimeoutIOException timeout = new TimeoutIOException( |
| 100 | + getName()+ " request #" + callId + " timeout " + |
| 101 | + requestTimeoutDuration.getDuration()); |
| 102 | + replyFuture.completeExceptionally(timeout); |
| 103 | + } |
| 104 | + }, LOG, () -> "Timeout check for client request #" + callId |
| 105 | + ); |
| 106 | + |
| 107 | + return replyFuture; |
56 | 108 | } catch (Throwable e) { |
57 | 109 | return JavaUtils.completeExceptionally(e); |
58 | 110 | } |
59 | 111 | } |
60 | 112 |
|
| 113 | + private String getName() { |
| 114 | + return name.get(); |
| 115 | + } |
| 116 | + |
| 117 | + private <T> CompletableFuture<T> failedFuture(Throwable ex) { |
| 118 | + CompletableFuture<T> future = new CompletableFuture<>(); |
| 119 | + future.completeExceptionally(ex); |
| 120 | + return future; |
| 121 | + } |
| 122 | + |
61 | 123 | @Override |
62 | 124 | public RaftClientReply sendRequest(RaftClientRequest request) throws IOException { |
63 | 125 | final RaftPeerId serverId = request.getServerId(); |
|
0 commit comments