Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/util/BiWeakValueCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;

import org.apache.ratis.thirdparty.com.google.common.collect.MapMaker;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
import java.util.function.Consumer;

/**
* Weak Value Cache: ({@link OUTER}, {@link INNER}) -> {@link T}.
* <p>
* Note that the cached values are weakly referenced.
* A cached value could be garage-collected (i.e. evicted from the cache)
* when there are no external (strong) references.
*
* @param <OUTER> the type of the outer keys.
* @param <INNER> the type of the inner keys.
* @param <T> the type to be cached.
*/
public final class BiWeakValueCache<OUTER, INNER, T> {
private static <K, V> ConcurrentMap<K, V> newMap() {
return new MapMaker().weakValues().makeMap();
}

private final String outerName;
private final String innerName;
private final String name;

/** For constructing {@link T} values from ({@link OUTER}, {@link INNER}) keys. */
private final BiFunction<OUTER, INNER, T> constructor;
/** Count the number of {@link T} values constructed. */
private final AtomicInteger valueCount = new AtomicInteger(0);

/**
* Actual map {@link OUTER} -> ({@link INNER} -> {@link T})
* for the logical view ({@link OUTER}, {@link INNER}) -> {@link T}.
*/
private final ConcurrentMap<OUTER, ConcurrentMap<INNER, T>> map = new ConcurrentHashMap<>();

/**
* Create a cache for mapping ({@link OUTER}, {@link INNER}) keys to {@link T} values.
*
* @param outerName the name of the outer long.
* @param innerName the name of the inner long.
* @param constructor for constructing {@link T} values.
*/
public BiWeakValueCache(String outerName, String innerName, BiFunction<OUTER, INNER, T> constructor) {
this.outerName = outerName;
this.innerName = innerName;
this.name = "(" + outerName + ", " + innerName + ")-cache";
this.constructor = constructor;
}

private T construct(OUTER outer, INNER inner) {
final T constructed = constructor.apply(outer, inner);
Objects.requireNonNull(constructed, "constructed == null");
valueCount.incrementAndGet();
return constructed;
}

/**
* If the key ({@link OUTER}, {@link INNER}) is in the cache, return the cached values.
* Otherwise, create a new value and then return it.
*/
public T getOrCreate(OUTER outer, INNER inner) {
Objects.requireNonNull(outer, () -> outerName + " (outer) == null");
Objects.requireNonNull(inner, () -> innerName + " (inner) == null");
final ConcurrentMap<INNER, T> innerMap = map.computeIfAbsent(outer, k -> newMap());
final T computed = innerMap.computeIfAbsent(inner, i -> construct(outer, i));
if ((valueCount.get() & 0xFFF) == 0) {
cleanupEmptyInnerMaps(); // cleanup empty maps once in a while
}
return computed;
}

/** @return the value count for the given outer key. */
int count(OUTER outer) {
final ConcurrentMap<INNER, T> innerMap = map.get(outer);
if (innerMap == null) {
return 0;
}

// size() may return incorrect result; see Guava MapMaker javadoc
int n = 0;
for (INNER ignored : innerMap.keySet()) {
n++;
}
return n;
}

void cleanupEmptyInnerMaps() {
// isEmpty() may return incorrect result; see Guava MapMaker javadoc
map.values().removeIf(e -> !e.entrySet().iterator().hasNext());
}

@Override
public String toString() {
return name;
}

/** The cache content for debugging. */
int dump(Consumer<String> out) {
out.accept(name + ":\n");
int emptyCount = 0;
for (Map.Entry<OUTER, ConcurrentMap<INNER, T>> entry : map.entrySet()) {
final OUTER outer = entry.getKey();
final ConcurrentMap<INNER, T> innerMap = entry.getValue();
final int count = count(outer);
if (count == 0) {
emptyCount++;
}

out.accept(" " + outerName + ":" + outer);
out.accept(", " + innerName + ":" + innerMap.keySet());
out.accept(", count=" + count);
out.accept(", size=" + innerMap.size());
out.accept("\n");
}
out.accept(" emptyCount=" + emptyCount);
out.accept("\n");
return emptyCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.TermIndexProto;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.BiWeakValueCache;

import java.util.Comparator;
import java.util.Optional;
Expand Down Expand Up @@ -73,43 +74,62 @@ static TermIndex valueOf(LogEntryProto proto) {

/** @return a {@link TermIndex} object. */
static TermIndex valueOf(long term, long index) {
return new TermIndex() {
@Override
public long getTerm() {
return term;
}

@Override
public long getIndex() {
return index;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (!(obj instanceof TermIndex)) {
return false;
return Impl.getCache().getOrCreate(term, index);
}

/**
* An implementation for private use.
* Note that this is not a public API, although this is public class.
*/
final class Impl {
private Impl() { }

private static final BiWeakValueCache<Long, Long, TermIndex> CACHE
= new BiWeakValueCache<>("term", "index", Impl::newTermIndex);

static BiWeakValueCache<Long, Long, TermIndex> getCache() {
return CACHE;
}

private static TermIndex newTermIndex(long term, long index) {
return new TermIndex() {
@Override
public long getTerm() {
return term;
}

@Override
public long getIndex() {
return index;
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (!(obj instanceof TermIndex)) {
return false;
}

final TermIndex that = (TermIndex) obj;
return this.getTerm() == that.getTerm()
&& this.getIndex() == that.getIndex();
}

@Override
public int hashCode() {
return Long.hashCode(term) ^ Long.hashCode(index);
}

private String longToString(long n) {
return n >= 0L ? String.valueOf(n) : "~";
}

final TermIndex that = (TermIndex) obj;
return this.getTerm() == that.getTerm()
&& this.getIndex() == that.getIndex();
}

@Override
public int hashCode() {
return Long.hashCode(term) ^ Long.hashCode(index);
}

private String longToString(long n) {
return n >= 0L? String.valueOf(n) : "~";
}

@Override
public String toString() {
return String.format("(t:%s, i:%s)", longToString(term), longToString(index));
}
};
@Override
public String toString() {
return String.format("(t:%s, i:%s)", longToString(term), longToString(index));
}
};
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.protocol;

import org.apache.ratis.util.BiWeakValueCache;

public interface ProtocolTestUtils {
static BiWeakValueCache<Long, Long, TermIndex> getTermIndexCache() {
return TermIndex.Impl.getCache();
}
}
100 changes: 100 additions & 0 deletions ratis-test/src/test/java/org/apache/ratis/util/TestTermIndex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;

import org.apache.ratis.BaseTest;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.server.protocol.ProtocolTestUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

/** Testing {@link BiWeakValueCache}. */
public class TestTermIndex extends BaseTest {
static BiWeakValueCache<Long, Long, TermIndex> CACHE = ProtocolTestUtils.getTermIndexCache();

static void dumpCache(Integer expectedEmptyCount) {
final int computed = CACHE.dump(System.out::print);
if (expectedEmptyCount != null) {
assertEquals(expectedEmptyCount, computed);
}
System.out.flush();
}

static void assertCacheSize(int expectedSize, long term) {
final int computed = CACHE.count(term);
if (computed != expectedSize) {
dumpCache(null);
}
assertEquals(expectedSize, computed);
}

void assertCacheSizeWithGC(int expectedSize, long term) throws Exception{
JavaUtils.attempt(() -> {
RaftTestUtil.gc();
assertCacheSize(expectedSize, term);
}, 5, HUNDRED_MILLIS, "assertCacheSizeWithGC", LOG);
}

static void initTermIndex(TermIndex[][] ti, int term, int index) {
ti[term][index] = TermIndex.valueOf(term, index);
}

@Test
public void testCaching() throws Exception {
final int n = 9;
final TermIndex[][] ti = new TermIndex[n][n];
final long[] terms = new long[n];
final long[] indices = new long[n];
for(int j = 0; j < n; j++) {
terms[j] = j;
indices[j] = j;
}

assertCacheSize(0, terms[1]);
initTermIndex(ti, 1, 1);
assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
assertCacheSize(1, terms[1]);

initTermIndex(ti, 1, 2);
assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2]));
assertCacheSize(2, terms[1]);
dumpCache(0);

initTermIndex(ti, 2, 2);
assertSame(ti[1][1], TermIndex.valueOf(terms[1], indices[1]));
assertSame(ti[1][2], TermIndex.valueOf(terms[1], indices[2]));
assertSame(ti[2][2], TermIndex.valueOf(terms[2], indices[2]));
assertCacheSize(2, terms[1]);
assertCacheSize(1, terms[2]);
dumpCache(0);

ti[1][1] = null; // release ti[1][1];
assertCacheSizeWithGC(1, terms[1]);
dumpCache(0);

ti[1][2] = null; // release ti[1][2];
assertCacheSizeWithGC(0, terms[1]);
dumpCache(1);

CACHE.cleanupEmptyInnerMaps();
dumpCache(0);
}
}
Loading