Skip to content

Commit 2fb805d

Browse files
committed
RATIS-1368. Fix server impl NPEs
1 parent 43915d2 commit 2fb805d

File tree

6 files changed

+106
-29
lines changed

6 files changed

+106
-29
lines changed

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -842,10 +842,15 @@ private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request
842842
newExceptionReply(request, generateNotLeaderException())));
843843
}
844844

845-
private CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftClientRequest request) {
845+
CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftClientRequest request) {
846846
return role.getLeaderState()
847-
.map(ls -> ls.streamEndOfRequestAsync(request))
848-
.orElse(null);
847+
.map(ls -> ls.streamEndOfRequestAsync(request))
848+
.orElseGet(() -> {
849+
final CompletableFuture<RaftClientRequest> errorF = new CompletableFuture<>();
850+
errorF.completeExceptionally(
851+
new Exception("Unexpected null encountered, while receiving end of stream request."));
852+
return errorF;
853+
});
849854
}
850855

851856
CompletableFuture<RaftClientReply> processQueryFuture(
@@ -1560,21 +1565,28 @@ private InstallSnapshotReplyProto notifyStateMachineToInstallSnapshot(
15601565
return reply;
15611566
}
15621567

1563-
Optional<RaftPeerProto> leaderPeerInfo = null;
1568+
RaftPeerProto leaderPeerInfo = null;
15641569
if (request.hasLastRaftConfigurationLogEntryProto()) {
15651570
List<RaftPeerProto> peerList = request.getLastRaftConfigurationLogEntryProto().getConfigurationEntry()
15661571
.getPeersList();
1567-
leaderPeerInfo = peerList.stream().filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
1568-
Preconditions.assertTrue(leaderPeerInfo.isPresent());
1572+
Optional<RaftPeerProto> optionalLeaderPeerInfo = peerList.stream()
1573+
.filter(p -> RaftPeerId.valueOf(p.getId()).equals(leaderId)).findFirst();
1574+
leaderPeerInfo = (optionalLeaderPeerInfo.isPresent()) ? optionalLeaderPeerInfo.get() : null;
15691575
}
15701576

15711577
// For the cases where RaftConf is empty on newly started peer with
15721578
// empty peer list, we retrieve leader info from
15731579
// installSnapShotRequestProto.
1574-
RoleInfoProto roleInfoProto =
1575-
getRaftConf().getPeer(state.getLeaderId()) == null ?
1576-
getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo.get())) :
1577-
getRoleInfoProto();
1580+
RoleInfoProto roleInfoProto;
1581+
RaftPeer raftPeer = getRaftConf().getPeer(state.getLeaderId());
1582+
if (raftPeer == null && leaderPeerInfo != null) {
1583+
roleInfoProto = getRoleInfoProto(ProtoUtils.toRaftPeer(leaderPeerInfo));
1584+
} else if (raftPeer == null) {
1585+
throw new IOException("Leader peer info is unknown.");
1586+
} else {
1587+
roleInfoProto = getRoleInfoProto();
1588+
}
1589+
15781590
// This is the first installSnapshot notify request for this term and
15791591
// index. Notify the state machine to install the snapshot.
15801592
LOG.info("{}: notifyInstallSnapshot: nextIndex is {} but the leader's first available index is {}.",
@@ -1633,7 +1645,7 @@ private CompletableFuture<Message> replyPendingRequest(
16331645
// update the retry cache
16341646
final CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId);
16351647
if (getInfo().isLeader()) {
1636-
Preconditions.assertTrue(cacheEntry != null && !cacheEntry.isCompletedNormally(),
1648+
Preconditions.assertTrue(!cacheEntry.isCompletedNormally(),
16371649
"retry cache entry should be pending: %s", cacheEntry);
16381650
}
16391651
if (cacheEntry.isFailed()) {

ratis-server/src/main/java/org/apache/ratis/server/impl/RetryCacheImpl.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,11 @@ public String toString() {
181181
}
182182

183183
CacheEntry getOrCreateEntry(ClientInvocationId key) {
184-
final CacheEntry entry;
185184
try {
186-
entry = cache.get(key, () -> new CacheEntry(key));
185+
return cache.get(key, () -> new CacheEntry(key));
187186
} catch (ExecutionException e) {
188187
throw new IllegalStateException(e);
189188
}
190-
return entry;
191189
}
192190

193191
CacheEntry refreshEntry(CacheEntry newEntry) {

ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020
import org.apache.log4j.Level;
2121
import org.apache.ratis.RaftTestUtil;
2222
import org.apache.ratis.conf.RaftProperties;
23-
import org.apache.ratis.protocol.RaftGroupId;
24-
import org.apache.ratis.protocol.RaftGroupMemberId;
25-
import org.apache.ratis.protocol.RaftPeer;
26-
import org.apache.ratis.protocol.RaftPeerId;
23+
import org.apache.ratis.protocol.*;
2724
import org.apache.ratis.server.DataStreamMap;
2825
import org.apache.ratis.server.DataStreamServer;
2926
import org.apache.ratis.server.DivisionInfo;
@@ -44,6 +41,7 @@
4441
import java.util.Arrays;
4542
import java.util.Collection;
4643
import java.util.Optional;
44+
import java.util.concurrent.CompletableFuture;
4745
import java.util.stream.Stream;
4846

4947
public class RaftServerTestUtil {
@@ -156,6 +154,10 @@ public static DataStreamMap newDataStreamMap(Object name) {
156154
return new DataStreamMapImpl(name);
157155
}
158156

157+
public static CompletableFuture<RaftClientRequest> streamEndOfRequestAsync(RaftServer.Division server, RaftClientRequest request) {
158+
return ((RaftServerImpl)server).streamEndOfRequestAsync(request);
159+
}
160+
159161
public static void assertLostMajorityHeartbeatsRecently(RaftServer.Division leader) {
160162
final FollowerState f = ((RaftServerImpl)leader).getRole().getFollowerState().orElse(null);
161163
Assert.assertNotNull(f);

ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,15 @@ public static void assertFailure(RetryCache cache, LogEntryProto logEntry, boole
5959
}
6060
}
6161

62-
public static void getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) {
63-
getOrCreateEntry(server.getRetryCache(), invocationId);
62+
public static RetryCache.Entry getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) {
63+
return getOrCreateEntryImpl(server.getRetryCache(), invocationId);
6464
}
6565

66-
private static RetryCache.Entry getOrCreateEntry(RetryCache cache, ClientInvocationId invocationId) {
66+
public static RetryCache.Entry getOrCreateEntry(RetryCache retryCache, ClientInvocationId invocationId) {
67+
return getOrCreateEntryImpl(retryCache, invocationId);
68+
}
69+
70+
private static RetryCache.Entry getOrCreateEntryImpl(RetryCache cache, ClientInvocationId invocationId) {
6771
return ((RetryCacheImpl)cache).getOrCreateEntry(invocationId);
6872
}
6973

ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
import org.apache.commons.lang3.RandomStringUtils;
3131
import org.apache.log4j.Level;
3232
import org.apache.ratis.BaseTest;
33-
import org.apache.ratis.protocol.RaftGroup;
33+
import org.apache.ratis.protocol.*;
34+
import org.apache.ratis.protocol.exceptions.StreamException;
3435
import org.apache.ratis.server.impl.MiniRaftCluster;
3536
import org.apache.ratis.RaftTestUtil;
3637
import org.apache.ratis.RaftTestUtil.SimpleMessage;
@@ -44,9 +45,6 @@
4445
import org.apache.ratis.proto.RaftProtos;
4546
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
4647
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
47-
import org.apache.ratis.protocol.RaftClientReply;
48-
import org.apache.ratis.protocol.RaftClientRequest;
49-
import org.apache.ratis.protocol.RaftPeerId;
5048
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
5149
import org.apache.ratis.retry.RetryPolicies;
5250
import org.apache.ratis.server.RaftServer;
@@ -74,6 +72,7 @@
7472
import java.util.concurrent.ExecutionException;
7573
import java.util.concurrent.TimeUnit;
7674
import java.util.concurrent.atomic.AtomicLong;
75+
import java.util.function.Consumer;
7776

7877
public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
7978
{
@@ -311,9 +310,50 @@ void testRaftClientRequestMetrics(MiniRaftClusterWithGrpc cluster) throws IOExce
311310
}
312311
}
313312

313+
@Test
314+
public void TestStreamEndOfRequestAsync() throws Exception {
315+
runWithNewCluster(1, this::runTestStreamEndOfRequestAsync);
316+
}
317+
318+
void runTestStreamEndOfRequestAsync(MiniRaftClusterWithGrpc cluster) throws Exception {
319+
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
320+
final RaftPeerId leaderId = leader.getId();
321+
final RaftGroupId leaderGroupId = leader.getGroup().getGroupId();
322+
final RaftClient client = cluster.createClient();
323+
final AtomicLong seqNum = new AtomicLong();
324+
final RaftClientRequest clientRequest = newRaftClientRequest(client, leaderId, seqNum.incrementAndGet(),
325+
RaftClientRequest.messageStreamRequestType(12, 12, true));
326+
327+
// Leader completes exceptionally, because there is no such stream
328+
// Creating realistic stream is complex, since streams are created by clients, but
329+
// this function tests server functionality.
330+
CompletableFuture<RaftClientRequest> fRequest = RaftServerTestUtil.streamEndOfRequestAsync(leader, clientRequest);
331+
Assert.assertNotNull(fRequest);
332+
Assert.assertTrue(fRequest.isCompletedExceptionally());
333+
fRequest.exceptionally(e -> {
334+
Assert.assertSame(e.getCause().getClass(), StreamException.class);
335+
return clientRequest;
336+
});
337+
338+
// On non leader, request should fail because only leaders handle this kind of requests
339+
RaftServer server = cluster.putNewServer(RaftPeerId.getRaftPeerId("Server 21"), leader.getGroup(), false);
340+
fRequest = RaftServerTestUtil.streamEndOfRequestAsync(server.getDivision(leaderGroupId), clientRequest);
341+
Assert.assertNotNull(fRequest);
342+
Assert.assertTrue(fRequest.isCompletedExceptionally());
343+
fRequest.exceptionally(e -> {
344+
Assert.assertSame(e.getCause().getClass(), Exception.class);
345+
return clientRequest;
346+
});
347+
}
348+
314349
static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum) {
350+
return newRaftClientRequest(client, serverId, seqNum, RaftClientRequest.writeRequestType());
351+
}
352+
353+
static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum,
354+
RaftClientRequest.Type type) {
315355
final SimpleMessage m = new SimpleMessage("m" + seqNum);
316356
return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m,
317-
RaftClientRequest.writeRequestType(), ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L));
357+
type, ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L));
318358
}
319359
}

ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@
2424
import org.apache.ratis.RaftTestUtil.SimpleOperation;
2525
import org.apache.ratis.conf.RaftProperties;
2626
import org.apache.ratis.metrics.RatisMetricRegistry;
27-
import org.apache.ratis.protocol.RaftGroupId;
28-
import org.apache.ratis.protocol.RaftGroupMemberId;
29-
import org.apache.ratis.protocol.RaftPeerId;
27+
import org.apache.ratis.protocol.*;
3028
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
3129
import org.apache.ratis.server.RaftServerConfigKeys;
3230
import org.apache.ratis.server.impl.RetryCacheTestUtil;
@@ -636,6 +634,29 @@ public void notifyLogFailed(Throwable cause, LogEntryProto entry) {
636634
throw ex;
637635
}
638636

637+
/**
638+
* Verifies that getOrCreateEntry function creates cache entry in every case and does not return null.
639+
*/
640+
@Test
641+
public void testGetOrCreateEntry() {
642+
final RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
643+
final ClientId clientId = ClientId.randomId();
644+
final long invocationId1 = 123456789;
645+
final ClientInvocationId clientInvocationId1 = ClientInvocationId.valueOf(clientId, invocationId1);
646+
RetryCache.Entry cacheEntry1 = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId1);
647+
Assert.assertNotNull(cacheEntry1);
648+
649+
RetryCache.Entry cacheEntry1Again = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId1);
650+
Assert.assertEquals(cacheEntry1.toString(), cacheEntry1Again.toString());
651+
652+
final long invocationId2 = 987654321;
653+
final ClientInvocationId clientInvocationId2 = ClientInvocationId.valueOf(clientId, invocationId2);
654+
RetryCache.Entry cacheEntry2 = RetryCacheTestUtil.getOrCreateEntry(retryCache, clientInvocationId2);
655+
Assert.assertNotNull(cacheEntry2);
656+
657+
Assert.assertNotEquals(cacheEntry1.toString(), cacheEntry2.toString());
658+
}
659+
639660
static Thread startAppendEntryThread(RaftLog raftLog, LogEntryProto entry) {
640661
final Thread t = new Thread(() -> {
641662
try {

0 commit comments

Comments
 (0)