Skip to content

Commit 9b132a3

Browse files
committed
Add a DUMMY message to the dummy watch request.
1 parent 456c1e3 commit 9b132a3

7 files changed

Lines changed: 43 additions & 12 deletions

File tree

ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@
5858
public final class OrderedAsync {
5959
public static final Logger LOG = LoggerFactory.getLogger(OrderedAsync.class);
6060

61+
public static final Message DUMMY = Message.valueOf("DUMMY");
62+
6163
private enum BatchLogKey implements BatchLogger.Key {
6264
SEND_REQUEST_EXCEPTION
6365
}
@@ -121,7 +123,7 @@ static OrderedAsync newInstance(RaftClientImpl client, RaftPeerId server, RaftPr
121123
// send a dummy watch request to establish the connection
122124
// TODO: this is a work around, it is better to fix the underlying RPC implementation
123125
if (RaftClientConfigKeys.Async.Experimental.sendDummyRequest(properties)) {
124-
ordered.send(RaftClientRequest.watchRequestType(), null, server);
126+
ordered.send(RaftClientRequest.watchRequestType(), DUMMY, server);
125127
}
126128
return ordered;
127129
}

ratis-common/src/main/java/org/apache/ratis/protocol/Message.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.ratis.thirdparty.com.google.protobuf.AbstractMessage;
2121
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
22+
import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
2223
import org.apache.ratis.util.MemoizedSupplier;
2324
import org.apache.ratis.util.StringUtils;
2425

@@ -47,11 +48,11 @@ public String toString() {
4748
}
4849

4950
static Message valueOf(AbstractMessage abstractMessage) {
50-
return valueOf(abstractMessage.toByteString(), abstractMessage::toString);
51+
return valueOf(abstractMessage.toByteString(), () -> TextFormat.shortDebugString(abstractMessage));
5152
}
5253

5354
static Message valueOf(ByteString bytes) {
54-
return valueOf(bytes, () -> "Message:" + StringUtils.bytes2HexShortString(bytes));
55+
return valueOf(bytes, () -> "Message:" + StringUtils.bytes2ShortString(bytes));
5556
}
5657

5758
static Message valueOf(String string) {

ratis-common/src/main/java/org/apache/ratis/util/MemoizedFunction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private MemoizedFunction(Function<PARAMETER, RETURN> initializer) {
5858
/**
5959
* @param parameter for passing to the initializer.
6060
* Since the returned function is memoized, the parameter is only used at the first call.
61-
* The parameter in the subsequent calls is ignored.
61+
* The parameter is ignored in the subsequent calls.
6262
*
6363
* @return the lazily initialized object.
6464
*/

ratis-common/src/main/java/org/apache/ratis/util/StringUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,23 @@ public static String format(final String format, final Object... objects) {
8181
return String.format(Locale.ENGLISH, format, objects);
8282
}
8383

84+
public static String bytes2ShortString(ByteString bytes) {
85+
return bytes.isValidUtf8() ? bytes2ShortUtf8String(bytes) : StringUtils.bytes2HexShortString(bytes);
86+
}
87+
88+
public static String bytes2ShortUtf8String(ByteString bytes) {
89+
final String utf8 = bytes.toStringUtf8();
90+
if (utf8.isEmpty()) {
91+
return "<EMPTY_UTF8>";
92+
}
93+
final int length = utf8.length();
94+
if (length <= 10) {
95+
return utf8;
96+
}
97+
// return only the first 10 characters
98+
return utf8.substring(0, 10) + "...(length=" + length + ")";
99+
}
100+
84101
public static String bytes2HexShortString(ByteString bytes) {
85102
final int size = bytes.size();
86103
if (size == 0) {

ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ratis.server.impl;
1919

2020
import org.apache.ratis.client.impl.ClientProtoUtils;
21+
import org.apache.ratis.client.impl.OrderedAsync;
2122
import org.apache.ratis.conf.RaftProperties;
2223
import org.apache.ratis.metrics.Timekeeper;
2324
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
@@ -1064,6 +1065,10 @@ private CompletableFuture<RaftClientReply> writeAsyncImpl(RaftClientRequest requ
10641065
}
10651066

10661067
private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request) {
1068+
if (OrderedAsync.DUMMY.getContent().equals(request.getMessage().getContent())) {
1069+
return CompletableFuture.completedFuture(RaftClientReply.newBuilder().setRequest(request).build());
1070+
}
1071+
10671072
final CompletableFuture<RaftClientReply> reply = checkLeaderState(request);
10681073
if (reply != null) {
10691074
return reply;
@@ -1078,7 +1083,7 @@ private CompletableFuture<RaftClientReply> watchAsync(RaftClientRequest request)
10781083
private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest request) {
10791084
final long minIndex = request.getType().getStaleRead().getMinIndex();
10801085
final long commitIndex = state.getLog().getLastCommittedIndex();
1081-
LOG.debug("{}: minIndex={}, commitIndex={}", getMemberId(), minIndex, commitIndex);
1086+
LOG.debug("{}: minIndex={}, commitIndex={} from {}", getMemberId(), minIndex, commitIndex, request.getClientId());
10821087
if (commitIndex < minIndex) {
10831088
final StaleReadException e = new StaleReadException(
10841089
"Unable to serve stale-read due to server commit index = " + commitIndex + " < min = " + minIndex);

ratis-server/src/test/java/org/apache/ratis/LinearizableReadTests.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,9 @@ public void setup() {
8888
RaftServerConfigKeys.Read.setOption(p, LINEARIZABLE);
8989
RaftServerConfigKeys.Read.setLeaderLeaseEnabled(p, isLeaderLeaseEnabled());
9090
RaftServerConfigKeys.Read.ReadIndex.setType(p, readIndexType());
91-
// Disable dummy request since currently the request is implemented as a watch request
92-
// which can cause follower client to trigger failover to leader which will cause the
93-
// all reads to be sent to the leader, making the follower read moot.
94-
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, false);
91+
92+
// Enable dummy request so linearizable-read tests exercise the default ordered-async bootstrap path.
93+
RaftClientConfigKeys.Async.Experimental.setSendDummyRequest(p, true);
9594
}
9695

9796
@Test
@@ -151,13 +150,16 @@ static <C extends MiniRaftCluster> void runTestFollowerLinearizableRead(C cluste
151150
final int n = 100;
152151
final List<Reply> f0Replies = new ArrayList<>(n);
153152
final List<Reply> f1Replies = new ArrayList<>(n);
154-
try (RaftClient client = cluster.createClient(leaderId)) {
153+
try (RaftClient client = cluster.createClient(leaderId);
154+
RaftClient c0 = cluster.createClient(f0);
155+
RaftClient c1 = cluster.createClient(f1);
156+
) {
155157
for (int i = 0; i < n; i++) {
156158
final int count = i + 1;
157159
assertReplyExact(count, client.io().send(INCREMENT));
158160

159-
f0Replies.add(new Reply(count, client.async().sendReadOnly(QUERY, f0)));
160-
f1Replies.add(new Reply(count, client.async().sendReadOnly(QUERY, f1)));
161+
f0Replies.add(new Reply(count, c0.async().sendReadOnly(QUERY, f0)));
162+
f1Replies.add(new Reply(count, c1.async().sendReadOnly(QUERY, f1)));
161163
}
162164

163165
for (int i = 0; i < n; i++) {

ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.util.Map;
6868
import java.util.Optional;
6969
import java.util.Timer;
70+
import java.util.UUID;
7071
import java.util.concurrent.ConcurrentHashMap;
7172
import java.util.concurrent.ExecutorService;
7273
import java.util.concurrent.Executors;
@@ -93,6 +94,8 @@ public abstract class MiniRaftCluster implements Closeable {
9394
TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
9495
static final AtomicInteger THREAD_COUNT = new AtomicInteger(0);
9596

97+
static final AtomicInteger CLIENT_ID = new AtomicInteger(0);
98+
9699
public abstract static class Factory<CLUSTER extends MiniRaftCluster> {
97100
public interface Get<CLUSTER extends MiniRaftCluster> {
98101
Supplier<RaftProperties> PROPERTIES = JavaUtils.memoize(RaftProperties::new);
@@ -747,6 +750,7 @@ public RaftClient createClient(RaftPeerId leaderId, RaftGroup raftGroup, RetryPo
747750

748751
public RaftClient createClient(RaftPeerId leaderId, RaftGroup raftGroup, RetryPolicy retryPolicy, RaftPeer primaryServer) {
749752
RaftClient.Builder builder = RaftClient.newBuilder()
753+
.setClientId(ClientId.valueOf(new UUID(0, CLIENT_ID.incrementAndGet())))
750754
.setRaftGroup(raftGroup)
751755
.setLeaderId(leaderId)
752756
.setProperties(properties)

0 commit comments

Comments
 (0)