Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 7.7-SNAPSHOT

#### Bugs
* Fix #7265: (informer) use ReadWriteLock in CacheImpl to prevent index read inconsistency during concurrent updates
* Fix #7543: fix processInlineDuplicateFields to recursively resolve nested inline embeds

#### Improvements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,17 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
* It basically saves and indexes all the entries.
* <br>
* Index reads {@link #byIndex(String, String)}, {@link #indexKeys(String, String)}, {@link #index(String, HasMetadata)}
* are not globally locked and thus may not be fully consistent with the current state
* A {@link ReentrantReadWriteLock} is used to ensure that index reads
* ({@link #byIndex(String, String)}, {@link #indexKeys(String, String)}, {@link #index(String, HasMetadata)})
* are consistent with write operations and never observe partially-updated index state.
*
* @param <T> type for cache object
*/
Expand Down Expand Up @@ -68,14 +71,16 @@ public Set<String> get(String indexKey) {
public static final String NAMESPACE_INDEX = "namespace";

// indexers stores index functions by their names
private final Map<String, Function<T, List<String>>> indexers = Collections.synchronizedMap(new HashMap<>());
private final Map<String, Function<T, List<String>>> indexers = new HashMap<>();

// items stores object instances
private ItemStore<T> items;

// indices stores objects' key by their indices
private final ConcurrentMap<String, Index> indices = new ConcurrentHashMap<>();

private final ReadWriteLock lock = new ReentrantReadWriteLock();

public CacheImpl() {
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
}
Expand All @@ -96,21 +101,29 @@ public void setItemStore(ItemStore<T> items) {
*/
@Override
public Map<String, Function<T, List<String>>> getIndexers() {
synchronized (indexers) {
return Collections.unmodifiableMap(indexers);
lock.readLock().lock();
try {
return Collections.unmodifiableMap(new HashMap<>(indexers));
} finally {
lock.readLock().unlock();
}
}

@Override
public synchronized void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
Set<String> intersection = new HashSet<>(indexers.keySet());
intersection.retainAll(indexersNew.keySet());
if (!intersection.isEmpty()) {
throw new IllegalArgumentException("Indexer conflict: " + intersection);
}
public void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
lock.writeLock().lock();
try {
Set<String> intersection = new HashSet<>(indexers.keySet());
intersection.retainAll(indexersNew.keySet());
if (!intersection.isEmpty()) {
throw new IllegalArgumentException("Indexer conflict: " + intersection);
}

for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexersNew.entrySet()) {
addIndexFunc(indexEntry.getKey(), indexEntry.getValue());
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexersNew.entrySet()) {
addIndexFuncUnlocked(indexEntry.getKey(), indexEntry.getValue());
}
} finally {
lock.writeLock().unlock();
}
}

Expand All @@ -120,14 +133,19 @@ public synchronized void addIndexers(Map<String, Function<T, List<String>>> inde
* @param obj the object
* @return the old object
*/
public synchronized T put(T obj) {
public T put(T obj) {
if (obj == null) {
return null;
}
String key = getKey(obj);
T oldObj = this.items.put(key, obj);
this.updateIndices(oldObj, obj, key);
return oldObj;
lock.writeLock().lock();
try {
String key = getKey(obj);
T oldObj = this.items.put(key, obj);
this.updateIndices(oldObj, obj, key);
return oldObj;
} finally {
lock.writeLock().unlock();
}
}

/**
Expand All @@ -136,13 +154,18 @@ public synchronized T put(T obj) {
* @param obj object
* @return the old object
*/
public synchronized T remove(T obj) {
String key = getKey(obj);
T old = this.items.remove(key);
if (old != null) {
this.updateIndices(old, null, key);
public T remove(T obj) {
lock.writeLock().lock();
try {
String key = getKey(obj);
T old = this.items.remove(key);
if (old != null) {
this.updateIndices(old, null, key);
}
return old;
} finally {
lock.writeLock().unlock();
}
return old;
}

/**
Expand Down Expand Up @@ -206,18 +229,23 @@ public T getByKey(String key) {
*/
@Override
public List<T> index(String indexName, T obj) {
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
if (indexFunc == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Index index = getIndex(indexName);
List<String> indexKeys = indexFunc.apply(obj);
Set<String> returnKeySet = new HashSet<>();
for (String indexKey : indexKeys) {
returnKeySet.addAll(index.get(indexKey));
}
lock.readLock().lock();
try {
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
if (indexFunc == null) {
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
}
Index index = getIndex(indexName);
List<String> indexKeys = indexFunc.apply(obj);
Set<String> returnKeySet = new HashSet<>();
for (String indexKey : indexKeys) {
returnKeySet.addAll(index.get(indexKey));
}

return getItems(returnKeySet);
return getItems(returnKeySet);
} finally {
lock.readLock().unlock();
}
}

private List<T> getItems(Set<String> returnKeySet) {
Expand All @@ -242,8 +270,13 @@ private Index getIndex(String indexName) {
*/
@Override
public List<String> indexKeys(String indexName, String indexKey) {
Index index = getIndex(indexName);
return new ArrayList<>(index.get(indexKey));
lock.readLock().lock();
try {
Index index = getIndex(indexName);
return new ArrayList<>(index.get(indexKey));
} finally {
lock.readLock().unlock();
}
}

/**
Expand All @@ -255,8 +288,13 @@ public List<String> indexKeys(String indexName, String indexKey) {
*/
@Override
public List<T> byIndex(String indexName, String indexKey) {
Index index = getIndex(indexName);
return getItems(index.get(indexKey));
lock.readLock().lock();
try {
Index index = getIndex(indexName);
return getItems(index.get(indexKey));
} finally {
lock.readLock().unlock();
}
}

/**
Expand Down Expand Up @@ -300,7 +338,16 @@ private void updateIndex(String key, T obj, Function<T, List<String>> indexFunc,
* @param indexName the index name
* @param indexFunc the index func
*/
public synchronized CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
public CacheImpl<T> addIndexFunc(String indexName, Function<T, List<String>> indexFunc) {
lock.writeLock().lock();
try {
return addIndexFuncUnlocked(indexName, indexFunc);
} finally {
lock.writeLock().unlock();
}
}

private CacheImpl<T> addIndexFuncUnlocked(String indexName, Function<T, List<String>> indexFunc) {
if (this.indices.containsKey(indexName)) {
throw new IllegalArgumentException("Indexer conflict: " + indexName);
}
Expand Down Expand Up @@ -370,17 +417,22 @@ public static List<String> metaNamespaceIndexFunc(Object obj) {
}

@Override
public synchronized void removeIndexer(String name) {
this.indices.remove(name);
this.indexers.remove(name);
public void removeIndexer(String name) {
lock.writeLock().lock();
try {
this.indices.remove(name);
this.indexers.remove(name);
} finally {
lock.writeLock().unlock();
}
}

public boolean isFullState() {
return items.isFullState();
}

public Object getLockObject() {
return this;
public ReadWriteLock getLock() {
return lock;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,12 @@ public String getKey(T obj) {

public void resync() {
// lock to ensure the ordering wrt other events
synchronized (cache.getLockObject()) {
cache.getLock().writeLock().lock();
try {
this.cache.list()
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<>(i, i), true));
} finally {
cache.getLock().writeLock().unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.informers.impl.cache;

import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Verifies that concurrent index reads never observe partially-updated index state.
*
* @see <a href="https://github.com/fabric8io/kubernetes-client/issues/7265">Issue 7265</a>
*/
class CacheImplConcurrencyTest {

private static final String LABEL_INDEX = "label-index";

@ParameterizedTest(name = "{0} should never miss object during concurrent updates")
@ValueSource(strings = { "byIndex", "indexKeys" })
void indexReadShouldNeverMissObjectDuringConcurrentUpdates(String readMethod) throws InterruptedException {
CacheImpl<Pod> cache = new CacheImpl<>(Cache.NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
cache.addIndexFunc(LABEL_INDEX, pod -> Collections.singletonList(pod.getMetadata().getLabels().get("app")));

Pod pod = new PodBuilder()
.withNewMetadata()
.withName("test-pod")
.withNamespace("default")
.withResourceVersion("1")
.addToLabels("app", "myapp")
.endMetadata()
.build();
cache.put(pod);

int writerThreads = 4;
int readerThreads = 8;
int iterations = 5000;
AtomicBoolean missDetected = new AtomicBoolean(false);
AtomicInteger missCount = new AtomicInteger(0);
CountDownLatch startLatch = new CountDownLatch(1);
CountDownLatch doneLatch = new CountDownLatch(writerThreads + readerThreads);

ExecutorService executor = Executors.newFixedThreadPool(writerThreads + readerThreads);

// Writers: continuously update the pod (same index key "myapp") to trigger remove+re-add in updateIndices
for (int w = 0; w < writerThreads; w++) {
final int writerId = w;
executor.submit(() -> {
try {
startLatch.await();
for (int i = 0; i < iterations; i++) {
Pod updated = new PodBuilder()
.withNewMetadata()
.withName("test-pod")
.withNamespace("default")
.withResourceVersion(String.valueOf(writerId * iterations + i + 2))
.addToLabels("app", "myapp")
.endMetadata()
.build();
cache.put(updated);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneLatch.countDown();
}
});
}

// Readers: continuously query the index and verify the pod is always found
for (int r = 0; r < readerThreads; r++) {
executor.submit(() -> {
try {
startLatch.await();
for (int i = 0; i < iterations; i++) {
boolean empty = readIndex(cache, readMethod);
if (empty) {
missDetected.set(true);
missCount.incrementAndGet();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneLatch.countDown();
}
});
}

// Start all threads simultaneously
startLatch.countDown();
doneLatch.await(30, TimeUnit.SECONDS);
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);

assertThat(missDetected.get())
.as("%s() should never return empty during concurrent updates, but missed %d times", readMethod, missCount.get())
.isFalse();
}

private boolean readIndex(CacheImpl<Pod> cache, String method) {
if ("byIndex".equals(method)) {
List<Pod> result = cache.byIndex(LABEL_INDEX, "myapp");
return result.isEmpty();
} else {
List<String> keys = cache.indexKeys(LABEL_INDEX, "myapp");
return keys.isEmpty();
}
}
}
Loading
Loading