Skip to content

Commit 456c1e3

Browse files
committed
RATIS-2497. Pass server to the dummy watch request in OrderedAsync
1 parent f141e10 commit 456c1e3

8 files changed

Lines changed: 163 additions & 60 deletions

File tree

ratis-client/src/main/java/org/apache/ratis/client/impl/AsyncImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class AsyncImpl implements AsyncRpcApi {
4040
CompletableFuture<RaftClientReply> send(
4141
RaftClientRequest.Type type, Message message, RaftPeerId server) {
4242
return TraceClient.asyncSend(
43-
() -> client.getOrderedAsync().send(type, message, server), type, server);
43+
() -> client.getOrderedAsync(server).send(type, message, server), type, server);
4444
}
4545

4646
@Override

ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedAsync.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,12 +116,12 @@ public String toString() {
116116
}
117117
}
118118

119-
static OrderedAsync newInstance(RaftClientImpl client, RaftProperties properties) {
119+
static OrderedAsync newInstance(RaftClientImpl client, RaftPeerId server, RaftProperties properties) {
120120
final OrderedAsync ordered = new OrderedAsync(client, properties);
121121
// send a dummy watch request to establish the connection
122122
// TODO: this is a work around, it is better to fix the underlying RPC implementation
123123
if (RaftClientConfigKeys.Async.Experimental.sendDummyRequest(properties)) {
124-
ordered.send(RaftClientRequest.watchRequestType(), null, null);
124+
ordered.send(RaftClientRequest.watchRequestType(), null, server);
125125
}
126126
return ordered;
127127
}

ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.ratis.util.CollectionUtils;
4848
import org.apache.ratis.util.IOUtils;
4949
import org.apache.ratis.util.JavaUtils;
50+
import org.apache.ratis.util.MemoizedFunction;
5051
import org.apache.ratis.util.MemoizedSupplier;
5152
import org.apache.ratis.util.Preconditions;
5253
import org.apache.ratis.util.TimeDuration;
@@ -180,7 +181,7 @@ private synchronized Set<Long> getAndReset() {
180181

181182
private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
182183

183-
private final Supplier<OrderedAsync> orderedAsync;
184+
private final MemoizedFunction<RaftPeerId, OrderedAsync> orderedAsync;
184185
private final Supplier<AsyncImpl> asyncApi;
185186
private final Supplier<BlockingImpl> blockingApi;
186187
private final Supplier<MessageStreamImpl> messageStreamApi;
@@ -209,7 +210,7 @@ private synchronized Set<Long> getAndReset() {
209210
clientRpc.addRaftPeers(group.getPeers());
210211
this.clientRpc = clientRpc;
211212

212-
this.orderedAsync = JavaUtils.memoize(() -> OrderedAsync.newInstance(this, properties));
213+
this.orderedAsync = MemoizedFunction.valueOf(server -> OrderedAsync.newInstance(this, server, properties));
213214
this.messageStreamApi = JavaUtils.memoize(() -> MessageStreamImpl.newInstance(this, properties));
214215
this.asyncApi = JavaUtils.memoize(() -> new AsyncImpl(this));
215216
this.blockingApi = JavaUtils.memoize(() -> new BlockingImpl(this));
@@ -277,8 +278,8 @@ TimeoutExecutor getScheduler() {
277278
return scheduler;
278279
}
279280

280-
OrderedAsync getOrderedAsync() {
281-
return orderedAsync.get();
281+
OrderedAsync getOrderedAsync(RaftPeerId server) {
282+
return orderedAsync.apply(server);
282283
}
283284

284285
RaftClientRequest newRaftClientRequest(
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.ratis.util;
19+
20+
import org.apache.ratis.util.function.CheckedSupplier;
21+
22+
import java.util.Objects;
23+
24+
/**
25+
* This is the base class for the memoized subclass such as
26+
* {@link MemoizedSupplier}, {@link MemoizedFunction}, {@link MemoizedCheckedSupplier}, etc,
27+
* The subclasses provide its own method to retrieve the value,
28+
* such as {@link MemoizedSupplier#get()} and {@link MemoizedFunction#apply(Object)}.
29+
* The subclass method returns a value by invoking its initializer once at the first call
30+
* and then keeps returning the same value for the subsequent calls.
31+
* <p>
32+
* All the subclasses are thread safe.
33+
*
34+
* @param <RETURN> The value type.
35+
* @param <THROW> The throwable type of the initializer.
36+
*/
37+
abstract class MemoizedBase<RETURN, THROW extends Throwable> {
38+
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
39+
private volatile RETURN value = null;
40+
41+
final RETURN init(CheckedSupplier<RETURN, THROW> initializer) throws THROW {
42+
final RETURN initialized = value;
43+
if (initialized != null) {
44+
return initialized;
45+
}
46+
47+
synchronized (this) {
48+
if (value == null) {
49+
value = initializer.get();
50+
Objects.requireNonNull(value, "initializer.get() returns null");
51+
}
52+
return value;
53+
}
54+
}
55+
56+
/** @return is the object initialized? */
57+
public final boolean isInitialized() {
58+
return value != null;
59+
}
60+
61+
/**
62+
* @return the value, which must be already initialized.
63+
* @throws NullPointerException if the value is uninitialized.
64+
*/
65+
public RETURN getInitializedValue() {
66+
return Objects.requireNonNull(value, "Uninitialized: value == null");
67+
}
68+
69+
@Override
70+
public String toString() {
71+
return value != null ? "Memoized:" + value : "Uninitialized";
72+
}
73+
}

ratis-common/src/main/java/org/apache/ratis/util/MemoizedCheckedSupplier.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -25,13 +25,14 @@
2525
* A memoized supplier is a {@link CheckedSupplier}
2626
* which gets a value by invoking its initializer once.
2727
* and then keeps returning the same value as its supplied results.
28-
*
28+
* <p>
2929
* This class is thread safe.
3030
*
3131
* @param <RETURN> The return type of the supplier.
3232
* @param <THROW> The throwable type of the supplier.
3333
*/
3434
public final class MemoizedCheckedSupplier<RETURN, THROW extends Throwable>
35+
extends MemoizedBase<RETURN, THROW>
3536
implements CheckedSupplier<RETURN, THROW> {
3637
/**
3738
* @param supplier to supply at most one non-null value.
@@ -45,9 +46,6 @@ public static <RETURN, THROW extends Throwable> MemoizedCheckedSupplier<RETURN,
4546

4647
private final CheckedSupplier<RETURN, THROW> initializer;
4748

48-
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
49-
private volatile RETURN value = null;
50-
5149
/**
5250
* Create a memoized supplier.
5351
* @param initializer to supply at most one non-null value.
@@ -60,33 +58,14 @@ private MemoizedCheckedSupplier(CheckedSupplier<RETURN, THROW> initializer) {
6058
/** @return the lazily initialized object. */
6159
@Override
6260
public RETURN get() throws THROW {
63-
RETURN v = value;
64-
if (v == null) {
65-
synchronized (this) {
66-
v = value;
67-
if (v == null) {
68-
v = value = Objects.requireNonNull(initializer.get(), "initializer.get() returns null");
69-
}
70-
}
71-
}
72-
return v;
61+
return init(initializer);
7362
}
7463

7564
/**
7665
* @return the already initialized object.
7766
* @throws NullPointerException if the object is uninitialized.
7867
*/
7968
public RETURN getUnchecked() {
80-
return Objects.requireNonNull(value, "value == null");
81-
}
82-
83-
/** @return is the object initialized? */
84-
public boolean isInitialized() {
85-
return value != null;
86-
}
87-
88-
@Override
89-
public String toString() {
90-
return isInitialized()? "Memoized:" + value: "UNINITIALIZED";
69+
return getInitializedValue();
9170
}
9271
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.ratis.util;
19+
20+
import java.util.Objects;
21+
import java.util.function.Function;
22+
23+
/**
24+
* A memoized function is a {@link Function}
25+
* which returns a value by invoking its initializer once
26+
* and then keeps returning the same value as its result.
27+
* <p>
28+
* This class is similar to {@link MemoizedSupplier} except that
29+
* the initializer takes a parameter.
30+
* <p>
31+
* This class is thread safe.
32+
*
33+
* @param <RETURN> The function result type.
34+
*/
35+
public final class MemoizedFunction<PARAMETER, RETURN>
36+
extends MemoizedBase<RETURN, RuntimeException>
37+
implements Function<PARAMETER, RETURN> {
38+
/**
39+
* @param function to supply at most one non-null value.
40+
* @return a {@link MemoizedFunction} with the given function.
41+
*/
42+
public static <P, R> MemoizedFunction<P, R> valueOf(Function<P, R> function) {
43+
return function instanceof MemoizedFunction ?
44+
(MemoizedFunction<P, R>) function : new MemoizedFunction<>(function);
45+
}
46+
47+
private final Function<PARAMETER, RETURN> initializer;
48+
49+
/**
50+
* Create a memoized function.
51+
* @param initializer to supply at most one non-null value.
52+
*/
53+
private MemoizedFunction(Function<PARAMETER, RETURN> initializer) {
54+
Objects.requireNonNull(initializer, "initializer == null");
55+
this.initializer = initializer;
56+
}
57+
58+
/**
59+
* @param parameter for passing to the initializer.
60+
* Since the returned function is memoized, the parameter is only used at the first call.
61+
* The parameter in the subsequent calls is ignored.
62+
*
63+
* @return the lazily initialized object.
64+
*/
65+
@Override
66+
public RETURN apply(PARAMETER parameter) {
67+
return init(() -> initializer.apply(parameter));
68+
}
69+
}

ratis-common/src/main/java/org/apache/ratis/util/MemoizedSupplier.java

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information
@@ -24,12 +24,14 @@
2424
* A memoized supplier is a {@link Supplier}
2525
* which gets a value by invoking its initializer once
2626
* and then keeps returning the same value as its supplied results.
27-
*
27+
* <p>
2828
* This class is thread safe.
2929
*
3030
* @param <T> The supplier result type.
3131
*/
32-
public final class MemoizedSupplier<T> implements Supplier<T> {
32+
public final class MemoizedSupplier<T>
33+
extends MemoizedBase<T, RuntimeException>
34+
implements Supplier<T> {
3335
/**
3436
* @param supplier to supply at most one non-null value.
3537
* @return a {@link MemoizedSupplier} with the given supplier.
@@ -40,8 +42,6 @@ public static <T> MemoizedSupplier<T> valueOf(Supplier<T> supplier) {
4042
}
4143

4244
private final Supplier<T> initializer;
43-
@SuppressWarnings({"squid:S3077"}) // Suppress volatile for generic type
44-
private volatile T value = null;
4545

4646
/**
4747
* Create a memoized supplier.
@@ -55,26 +55,6 @@ private MemoizedSupplier(Supplier<T> initializer) {
5555
/** @return the lazily initialized object. */
5656
@Override
5757
public T get() {
58-
T v = value;
59-
if (v == null) {
60-
synchronized (this) {
61-
v = value;
62-
if (v == null) {
63-
v = value = Objects.requireNonNull(initializer.get(),
64-
"initializer.get() returns null");
65-
}
66-
}
67-
}
68-
return v;
69-
}
70-
71-
/** @return is the object initialized? */
72-
public boolean isInitialized() {
73-
return value != null;
74-
}
75-
76-
@Override
77-
public String toString() {
78-
return isInitialized()? "Memoized:" + get(): "UNINITIALIZED";
58+
return init(initializer::get);
7959
}
8060
}

ratis-server/src/test/java/org/apache/ratis/client/impl/RaftClientTestUtil.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
/** Interface for testing raft client. */
3030
public interface RaftClientTestUtil {
3131
static void assertAsyncRequestSemaphore(RaftClient client, int expectedAvailablePermits, int expectedQueueLength) {
32-
((RaftClientImpl) client).getOrderedAsync().assertRequestSemaphore(expectedAvailablePermits, expectedQueueLength);
32+
((RaftClientImpl) client).getOrderedAsync(null)
33+
.assertRequestSemaphore(expectedAvailablePermits, expectedQueueLength);
3334
}
3435

3536
static ClientInvocationId getClientInvocationId(RaftClient client) {

0 commit comments

Comments
 (0)