Skip to content

Commit 81fa8b6

Browse files
committed
RATIS-2258. Caching TermIndex objects (apache#1239)
1 parent ccba0b5 commit 81fa8b6

5 files changed

Lines changed: 341 additions & 36 deletions

File tree

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.ratis.util;
19+
20+
import org.apache.ratis.thirdparty.com.google.common.collect.MapMaker;
21+
22+
import java.util.Map;
23+
import java.util.Objects;
24+
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.ConcurrentMap;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import java.util.function.BiFunction;
28+
import java.util.function.Consumer;
29+
30+
/**
31+
* Weak Value Cache: ({@link OUTER}, {@link INNER}) -> {@link T}.
32+
* <p>
33+
* Note that the cached values are weakly referenced.
34+
* A cached value could be garage-collected (i.e. evicted from the cache)
35+
* when there are no external (strong) references.
36+
*
37+
* @param <OUTER> the type of the outer keys.
38+
* @param <INNER> the type of the inner keys.
39+
* @param <T> the type to be cached.
40+
*/
41+
public final class BiWeakValueCache<OUTER, INNER, T> {
42+
private static <K, V> ConcurrentMap<K, V> newMap() {
43+
return new MapMaker().weakValues().makeMap();
44+
}
45+
46+
private final String outerName;
47+
private final String innerName;
48+
private final String name;
49+
50+
/** For constructing {@link T} values from ({@link OUTER}, {@link INNER}) keys. */
51+
private final BiFunction<OUTER, INNER, T> constructor;
52+
/** Count the number of {@link T} values constructed. */
53+
private final AtomicInteger valueCount = new AtomicInteger(0);
54+
55+
/**
56+
* Actual map {@link OUTER} -> ({@link INNER} -> {@link T})
57+
* for the logical view ({@link OUTER}, {@link INNER}) -> {@link T}.
58+
*/
59+
private final ConcurrentMap<OUTER, ConcurrentMap<INNER, T>> map = new ConcurrentHashMap<>();
60+
61+
/**
62+
* Create a cache for mapping ({@link OUTER}, {@link INNER}) keys to {@link T} values.
63+
*
64+
* @param outerName the name of the outer long.
65+
* @param innerName the name of the inner long.
66+
* @param constructor for constructing {@link T} values.
67+
*/
68+
public BiWeakValueCache(String outerName, String innerName, BiFunction<OUTER, INNER, T> constructor) {
69+
this.outerName = outerName;
70+
this.innerName = innerName;
71+
this.name = "(" + outerName + ", " + innerName + ")-cache";
72+
this.constructor = constructor;
73+
}
74+
75+
private T construct(OUTER outer, INNER inner) {
76+
final T constructed = constructor.apply(outer, inner);
77+
Objects.requireNonNull(constructed, "constructed == null");
78+
valueCount.incrementAndGet();
79+
return constructed;
80+
}
81+
82+
/**
83+
* If the key ({@link OUTER}, {@link INNER}) is in the cache, return the cached values.
84+
* Otherwise, create a new value and then return it.
85+
*/
86+
public T getOrCreate(OUTER outer, INNER inner) {
87+
Objects.requireNonNull(outer, () -> outerName + " (outer) == null");
88+
Objects.requireNonNull(inner, () -> innerName + " (inner) == null");
89+
final ConcurrentMap<INNER, T> innerMap = map.computeIfAbsent(outer, k -> newMap());
90+
final T computed = innerMap.computeIfAbsent(inner, i -> construct(outer, i));
91+
if ((valueCount.get() & 0xFFF) == 0) {
92+
cleanupEmptyInnerMaps(); // cleanup empty maps once in a while
93+
}
94+
return computed;
95+
}
96+
97+
/** @return the value count for the given outer key. */
98+
int count(OUTER outer) {
99+
final ConcurrentMap<INNER, T> innerMap = map.get(outer);
100+
if (innerMap == null) {
101+
return 0;
102+
}
103+
104+
// size() may return incorrect result; see Guava MapMaker javadoc
105+
int n = 0;
106+
for (INNER ignored : innerMap.keySet()) {
107+
n++;
108+
}
109+
return n;
110+
}
111+
112+
void cleanupEmptyInnerMaps() {
113+
// isEmpty() may return incorrect result; see Guava MapMaker javadoc
114+
map.values().removeIf(e -> !e.entrySet().iterator().hasNext());
115+
}
116+
117+
@Override
118+
public String toString() {
119+
return name;
120+
}
121+
122+
/** The cache content for debugging. */
123+
int dump(Consumer<String> out) {
124+
out.accept(name + ":\n");
125+
int emptyCount = 0;
126+
for (Map.Entry<OUTER, ConcurrentMap<INNER, T>> entry : map.entrySet()) {
127+
final OUTER outer = entry.getKey();
128+
final ConcurrentMap<INNER, T> innerMap = entry.getValue();
129+
final int count = count(outer);
130+
if (count == 0) {
131+
emptyCount++;
132+
}
133+
134+
out.accept(" " + outerName + ":" + outer);
135+
out.accept(", " + innerName + ":" + innerMap.keySet());
136+
out.accept(", count=" + count);
137+
out.accept(", size=" + innerMap.size());
138+
out.accept("\n");
139+
}
140+
out.accept(" emptyCount=" + emptyCount);
141+
out.accept("\n");
142+
return emptyCount;
143+
}
144+
}

ratis-server-api/src/main/java/org/apache/ratis/server/protocol/TermIndex.java

Lines changed: 56 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
2121
import org.apache.ratis.proto.RaftProtos.TermIndexProto;
2222
import org.apache.ratis.server.raftlog.RaftLog;
23+
import org.apache.ratis.util.BiWeakValueCache;
2324

2425
import java.util.Comparator;
2526
import java.util.Optional;
@@ -73,43 +74,62 @@ static TermIndex valueOf(LogEntryProto proto) {
7374

7475
/** @return a {@link TermIndex} object. */
7576
static TermIndex valueOf(long term, long index) {
76-
return new TermIndex() {
77-
@Override
78-
public long getTerm() {
79-
return term;
80-
}
81-
82-
@Override
83-
public long getIndex() {
84-
return index;
85-
}
86-
87-
@Override
88-
public boolean equals(Object obj) {
89-
if (obj == this) {
90-
return true;
91-
} else if (!(obj instanceof TermIndex)) {
92-
return false;
77+
return Impl.getCache().getOrCreate(term, index);
78+
}
79+
80+
/**
81+
* An implementation for private use.
82+
* Note that this is not a public API, although this is public class.
83+
*/
84+
final class Impl {
85+
private Impl() { }
86+
87+
private static final BiWeakValueCache<Long, Long, TermIndex> CACHE
88+
= new BiWeakValueCache<>("term", "index", Impl::newTermIndex);
89+
90+
static BiWeakValueCache<Long, Long, TermIndex> getCache() {
91+
return CACHE;
92+
}
93+
94+
private static TermIndex newTermIndex(long term, long index) {
95+
return new TermIndex() {
96+
@Override
97+
public long getTerm() {
98+
return term;
99+
}
100+
101+
@Override
102+
public long getIndex() {
103+
return index;
104+
}
105+
106+
@Override
107+
public boolean equals(Object obj) {
108+
if (obj == this) {
109+
return true;
110+
} else if (!(obj instanceof TermIndex)) {
111+
return false;
112+
}
113+
114+
final TermIndex that = (TermIndex) obj;
115+
return this.getTerm() == that.getTerm()
116+
&& this.getIndex() == that.getIndex();
117+
}
118+
119+
@Override
120+
public int hashCode() {
121+
return Long.hashCode(term) ^ Long.hashCode(index);
122+
}
123+
124+
private String longToString(long n) {
125+
return n >= 0L ? String.valueOf(n) : "~";
93126
}
94127

95-
final TermIndex that = (TermIndex) obj;
96-
return this.getTerm() == that.getTerm()
97-
&& this.getIndex() == that.getIndex();
98-
}
99-
100-
@Override
101-
public int hashCode() {
102-
return Long.hashCode(term) ^ Long.hashCode(index);
103-
}
104-
105-
private String longToString(long n) {
106-
return n >= 0L? String.valueOf(n) : "~";
107-
}
108-
109-
@Override
110-
public String toString() {
111-
return String.format("(t:%s, i:%s)", longToString(term), longToString(index));
112-
}
113-
};
128+
@Override
129+
public String toString() {
130+
return String.format("(t:%s, i:%s)", longToString(term), longToString(index));
131+
}
132+
};
133+
}
114134
}
115135
}

ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.slf4j.LoggerFactory;
5151

5252
import java.io.IOException;
53+
import java.lang.ref.WeakReference;
5354
import java.lang.reflect.Field;
5455
import java.util.ArrayList;
5556
import java.util.Arrays;
@@ -590,4 +591,18 @@ static void assertSuccessReply(RaftClientReply reply) {
590591
Assert.assertNotNull("reply == null", reply);
591592
Assert.assertTrue("reply is not success: " + reply, reply.isSuccess());
592593
}
594+
595+
static void gc() throws InterruptedException {
596+
// use WeakReference to detect gc
597+
Object obj = new Object();
598+
final WeakReference<Object> weakRef = new WeakReference<>(obj);
599+
obj = null;
600+
601+
// loop until gc has completed.
602+
for (int i = 0; weakRef.get() != null; i++) {
603+
LOG.info("gc {}", i);
604+
System.gc();
605+
Thread.sleep(100);
606+
}
607+
}
593608
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.ratis.server.protocol;
19+
20+
import org.apache.ratis.util.BiWeakValueCache;
21+
22+
public interface ProtocolTestUtils {
23+
static BiWeakValueCache<Long, Long, TermIndex> getTermIndexCache() {
24+
return TermIndex.Impl.getCache();
25+
}
26+
}

0 commit comments

Comments
 (0)