Skip to content

Commit 6418a62

Browse files
committed
RATIS-2400. Support timeout and interrupt handling in GrpcClientRpc.
1 parent b02fed5 commit 6418a62

File tree

2 files changed

+90
-1
lines changed

2 files changed

+90
-1
lines changed

ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,16 @@
1717
*/
1818
package org.apache.ratis.grpc.client;
1919

20+
import org.apache.ratis.client.RaftClientConfigKeys;
2021
import org.apache.ratis.client.impl.ClientProtoUtils;
2122
import org.apache.ratis.client.impl.RaftClientRpcWithProxy;
2223
import org.apache.ratis.conf.RaftProperties;
2324
import org.apache.ratis.grpc.GrpcConfigKeys;
2425
import org.apache.ratis.grpc.GrpcUtil;
2526
import org.apache.ratis.protocol.*;
2627
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
28+
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
29+
import org.apache.ratis.thirdparty.io.grpc.Status;
2730
import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
2831
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
2932
import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
@@ -39,26 +42,34 @@
3942
import org.apache.ratis.util.IOUtils;
4043
import org.apache.ratis.util.JavaUtils;
4144
import org.apache.ratis.util.PeerProxyMap;
45+
import org.apache.ratis.util.TimeDuration;
46+
import org.apache.ratis.util.TimeoutExecutor;
4247
import org.slf4j.Logger;
4348
import org.slf4j.LoggerFactory;
4449

4550
import java.io.IOException;
4651
import java.io.InterruptedIOException;
4752
import java.util.concurrent.CompletableFuture;
4853
import java.util.concurrent.ExecutionException;
54+
import java.util.concurrent.TimeUnit;
4955

5056
public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClient> {
5157
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientRpc.class);
5258

5359
private final ClientId clientId;
5460
private final int maxMessageSize;
61+
private final TimeDuration requestTimeoutDuration;
62+
private final TimeDuration watchRequestTimeoutDuration;
63+
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
5564

5665
public GrpcClientRpc(ClientId clientId, RaftProperties properties,
5766
SslContext adminSslContext, SslContext clientSslContext) {
5867
super(new PeerProxyMap<>(clientId.toString(),
5968
p -> new GrpcClientProtocolClient(clientId, p, properties, adminSslContext, clientSslContext)));
6069
this.clientId = clientId;
6170
this.maxMessageSize = GrpcConfigKeys.messageSizeMax(properties, LOG::debug).getSizeInt();
71+
this.requestTimeoutDuration = RaftClientConfigKeys.Rpc.requestTimeout(properties);
72+
this.watchRequestTimeoutDuration = RaftClientConfigKeys.Rpc.watchRequestTimeout(properties);
6273
}
6374

6475
@Override
@@ -122,11 +133,11 @@ public RaftClientReply sendRequest(RaftClientRequest request)
122133
return ClientProtoUtils.toRaftClientReply(proxy.leaderElectionManagement(proto));
123134
} else {
124135
final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
125-
// TODO: timeout support
126136
try {
127137
return f.get();
128138
} catch (InterruptedException e) {
129139
Thread.currentThread().interrupt();
140+
f.cancel(true);
130141
throw new InterruptedIOException(
131142
"Interrupted while waiting for response of request " + request);
132143
} catch (ExecutionException e) {
@@ -143,6 +154,7 @@ private CompletableFuture<RaftClientReply> sendRequest(
143154
final RaftClientRequestProto requestProto =
144155
toRaftClientRequestProto(request);
145156
final CompletableFuture<RaftClientReplyProto> replyFuture = new CompletableFuture<>();
157+
final TimeDuration timeout = getTimeoutDuration(request);
146158
// create a new grpc stream for each non-async call.
147159
final StreamObserver<RaftClientRequestProto> requestObserver =
148160
proxy.unorderedWithTimeout(new StreamObserver<RaftClientReplyProto>() {
@@ -164,12 +176,35 @@ public void onCompleted() {
164176
}
165177
}
166178
});
179+
replyFuture.whenComplete((reply, exception) -> {
180+
if (replyFuture.isCancelled()) {
181+
requestObserver.onError(Status.CANCELLED
182+
.withDescription(clientId + ": request #" + request.getCallId() + " cancelled")
183+
.asRuntimeException());
184+
}
185+
});
186+
scheduler.onTimeout(timeout, () -> {
187+
if (replyFuture.completeExceptionally(new TimeoutIOException(
188+
clientId + ": request #" + request.getCallId() + " timeout " + timeout))) {
189+
requestObserver.onError(Status.DEADLINE_EXCEEDED
190+
.withDescription(clientId + ": request #" + request.getCallId() + " timeout " + timeout)
191+
.asRuntimeException());
192+
}
193+
}, LOG, () -> "Timeout check failed for client request #" + request.getCallId());
167194
requestObserver.onNext(requestProto);
168195
requestObserver.onCompleted();
169196

170197
return replyFuture.thenApply(ClientProtoUtils::toRaftClientReply);
171198
}
172199

200+
private TimeDuration getTimeoutDuration(RaftClientRequest request) {
201+
final long timeoutMs = request.getTimeoutMs();
202+
if (timeoutMs > 0) {
203+
return TimeDuration.valueOf(timeoutMs, TimeUnit.MILLISECONDS);
204+
}
205+
return request.is(RaftClientRequestProto.TypeCase.WATCH) ? watchRequestTimeoutDuration : requestTimeoutDuration;
206+
}
207+
173208
private RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) throws IOException {
174209
final RaftClientRequestProto proto = ClientProtoUtils.toRaftClientRequestProto(request);
175210
if (proto.getSerializedSize() > maxMessageSize) {

ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
import javax.net.ssl.KeyManager;
7070
import javax.net.ssl.TrustManager;
7171
import java.io.IOException;
72+
import java.io.InterruptedIOException;
7273
import java.nio.channels.OverlappingFileLockException;
7374
import java.util.ArrayList;
7475
import java.util.Arrays;
@@ -81,6 +82,7 @@
8182
import java.util.concurrent.ThreadLocalRandom;
8283
import java.util.concurrent.TimeUnit;
8384
import java.util.concurrent.atomic.AtomicLong;
85+
import java.util.concurrent.atomic.AtomicReference;
8486

8587
public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
8688
{
@@ -238,6 +240,58 @@ public void testRaftClientMetrics(Boolean separateHeartbeat) throws Exception {
238240
runWithNewCluster(3, this::testRaftClientRequestMetrics);
239241
}
240242

243+
@ParameterizedTest
244+
@MethodSource("data")
245+
public void testGrpcClientRpcSyncTimeout(Boolean separateHeartbeat) throws Exception {
246+
GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat);
247+
runWithNewCluster(3, cluster -> {
248+
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
249+
try (RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
250+
final SimpleStateMachine4Testing stateMachine = SimpleStateMachine4Testing.get(cluster.getLeader());
251+
stateMachine.blockStartTransaction();
252+
try {
253+
Assertions.assertThrows(TimeoutIOException.class,
254+
() -> client.io().send(new SimpleMessage("sync-timeout")));
255+
} finally {
256+
stateMachine.unblockStartTransaction();
257+
}
258+
}
259+
});
260+
}
261+
262+
@ParameterizedTest
263+
@MethodSource("data")
264+
public void testGrpcClientRpcSyncCancelOnInterrupt(Boolean separateHeartbeat) throws Exception {
265+
RaftClientConfigKeys.Rpc.setRequestTimeout(getProperties(), TimeDuration.valueOf(10, TimeUnit.SECONDS));
266+
GrpcConfigKeys.Server.setHeartbeatChannel(getProperties(), separateHeartbeat);
267+
runWithNewCluster(3, cluster -> {
268+
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
269+
try (RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
270+
final SimpleStateMachine4Testing stateMachine = SimpleStateMachine4Testing.get(cluster.getLeader());
271+
stateMachine.blockStartTransaction();
272+
try {
273+
final AtomicReference<Throwable> error = new AtomicReference<>();
274+
final Thread t = new Thread(() -> {
275+
try {
276+
client.io().send(new SimpleMessage("sync-cancel"));
277+
} catch (Throwable e) {
278+
error.set(e);
279+
}
280+
});
281+
t.start();
282+
Thread.sleep(200);
283+
t.interrupt();
284+
t.join(5000);
285+
Assertions.assertFalse(t.isAlive(), "request thread should exit after interrupt");
286+
Assertions.assertTrue(error.get() instanceof InterruptedIOException,
287+
"expected InterruptedIOException but got " + error.get());
288+
} finally {
289+
stateMachine.unblockStartTransaction();
290+
}
291+
}
292+
});
293+
}
294+
241295
@ParameterizedTest
242296
@MethodSource("data")
243297
public void testRaftServerMetrics(Boolean separateHeartbeat) throws Exception {

0 commit comments

Comments
 (0)