Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* It basically saves and indexes all the entries.
* <br>
* Index reads {@link #byIndex(String, String)}, {@link #indexKeys(String, String)}, {@link #index(String, HasMetadata)}
* are not globally locked and thus may not be fully consistent with the current state
* A {@link ReentrantReadWriteLock} is used to ensure that index reads
* ({@link #byIndex(String, String)}, {@link #indexKeys(String, String)}, {@link #index(String, HasMetadata)})
* are consistent with write operations and never observe partially-updated index state.
*
* @param <T> type for cache object
*/
Expand Down Expand Up @@ -68,14 +71,16 @@ public Set<String> get(String indexKey) {
public static final String NAMESPACE_INDEX = "namespace";

// indexers stores index functions by their names
private final Map<String, Function<T, List<String>>> indexers = Collections.synchronizedMap(new HashMap<>());
private final Map<String, Function<T, List<String>>> indexers = new HashMap<>();

// items stores object instances
private ItemStore<T> items;

// indices stores objects' key by their indices
private final ConcurrentMap<String, Index> indices = new ConcurrentHashMap<>();

private final ReadWriteLock lock = new ReentrantReadWriteLock();

public CacheImpl() {
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
}
Expand All @@ -96,21 +101,29 @@ public void setItemStore(ItemStore<T> items) {
*/
@Override
public Map<String, Function<T, List<String>>> getIndexers() {
synchronized (indexers) {
return Collections.unmodifiableMap(indexers);
lock.readLock().lock();
try {
return Collections.unmodifiableMap(new HashMap<>(indexers));
} finally {
lock.readLock().unlock();
}
}

@Override
public synchronized void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
Set<String> intersection = new HashSet<>(indexers.keySet());
intersection.retainAll(indexersNew.keySet());
if (!intersection.isEmpty()) {
throw new IllegalArgumentException("Indexer conflict: " + intersection);
}
public void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
lock.writeLock().lock();
try {
Set<String> intersection = new HashSet<>(indexers.keySet());
intersection.retainAll(indexersNew.keySet());
if (!intersection.isEmpty()) {
throw new IllegalArgumentException("Indexer conflict: " + intersection);
}

for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexersNew.entrySet()) {
addIndexFunc(indexEntry.getKey(), indexEntry.getValue());
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexersNew.entrySet()) {
addIndexFuncUnlocked(indexEntry.getKey(), indexEntry.getValue());
}
} finally {
lock.writeLock().unlock();
}
}

Expand All @@ -120,14 +133,19 @@ public synchronized void addIndexers(Map<String, Function<T, List<String>>> inde
* @param obj the object
* @return the old object
*/
public synchronized T put(T obj) {
public T put(T obj) {
if (obj == null) {
return null;
}
String key = getKey(obj);
T oldObj = this.items.put(key, obj);
this.updateIndices(oldObj, obj, key);
return oldObj;
lock.writeLock().lock();
try {
String key = getKey(obj);
T oldObj = this.items.put(key, obj);
this.updateIndices(oldObj, obj, key);
return oldObj;
} finally {
lock.writeLock().unlock();
}
}

/**
Expand All @@ -136,13 +154,18 @@ public synchronized T put(T obj) {
* @param obj object
* @return the old object
*/
public synchronized T remove(T obj) {
String key = getKey(obj);
T old = this.items.remove(key);
if (old != null) {
this.updateIndices(old, null, key);
public T remove(T obj) {
lock.writeLock().lock();
try {
String key = getKey(obj);
T old = this.items.remove(key);
if (old != null) {
this.updateIndices(old, null, key);
}
return old;
} finally {
lock.writeLock().unlock();
}
return old;
}

/**
Expand Down Expand Up @@ -206,18 +229,23 @@ public T getByKey(String key) {
*/
@Override
public List<T> index(String indexName, T obj) {
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
if (indexFunc == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Index index = getIndex(indexName);
List<String> indexKeys = indexFunc.apply(obj);
Set<String> returnKeySet = new HashSet<>();
for (String indexKey : indexKeys) {
returnKeySet.addAll(index.get(indexKey));
}
lock.readLock().lock();
try {
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
if (indexFunc == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Index index = getIndex(indexName);
List<String> indexKeys = indexFunc.apply(obj);
Set<String> returnKeySet = new HashSet<>();
for (String indexKey : indexKeys) {
returnKeySet.addAll(index.get(indexKey));
}

return getItems(returnKeySet);
return getItems(returnKeySet);
} finally {
lock.readLock().unlock();
}
}

private List<T> getItems(Set<String> returnKeySet) {
Expand All @@ -242,8 +270,13 @@ private Index getIndex(String indexName) {
*/
@Override
public List<String> indexKeys(String indexName, String indexKey) {
Index index = getIndex(indexName);
return new ArrayList<>(index.get(indexKey));
lock.readLock().lock();
try {
Index index = getIndex(indexName);
return new ArrayList<>(index.get(indexKey));
} finally {
lock.readLock().unlock();
}
}

/**
Expand All @@ -255,8 +288,13 @@ public List<String> indexKeys(String indexName, String indexKey) {
*/
@Override
public List<T> byIndex(String indexName, String indexKey) {
Index index = getIndex(indexName);
return getItems(index.get(indexKey));
lock.readLock().lock();
try {
Index index = getIndex(indexName);
return getItems(index.get(indexKey));
} finally {
lock.readLock().unlock();
}
}

/**
Expand Down Expand Up @@ -300,7 +338,16 @@ private void updateIndex(String key, T obj, Function<T, List<String>> indexFunc,
* @param indexName the index name
* @param indexFunc the index func
*/
public synchronized CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
public CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
lock.writeLock().lock();
try {
return addIndexFuncUnlocked(indexName, indexFunc);
} finally {
lock.writeLock().unlock();
}
}

private CacheImpl<T> addIndexFuncUnlocked(String indexName, Function<T, List<String>> indexFunc) {
if (this.indices.containsKey(indexName)) {
throw new IllegalArgumentException("Indexer conflict: " + indexName);
}
Expand Down Expand Up @@ -370,17 +417,22 @@ public static List<String> metaNamespaceIndexFunc(Object obj) {
}

@Override
public synchronized void removeIndexer(String name) {
this.indices.remove(name);
this.indexers.remove(name);
public void removeIndexer(String name) {
lock.writeLock().lock();
try {
this.indices.remove(name);
this.indexers.remove(name);
} finally {
lock.writeLock().unlock();
}
}

public boolean isFullState() {
return items.isFullState();
}

public Object getLockObject() {
return this;
public ReadWriteLock getLock() {
return lock;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,12 @@ public String getKey(T obj) {

public void resync() {
// lock to ensure the ordering wrt other events
synchronized (cache.getLockObject()) {
cache.getLock().writeLock().lock();
try {
this.cache.list()
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true));
} finally {
cache.getLock().writeLock().unlock();
}
}

Expand Down
Loading
Loading