Skip to content

Commit cf62da5

Browse files
committed
preventing the read of stale index entries
Signed-off-by: Steve Hawkins <shawkins@redhat.com>
1 parent 81e78c1 commit cf62da5

File tree

1 file changed

+57
-57
lines changed
  • kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache

1 file changed

+57
-57
lines changed

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/CacheImpl.java

Lines changed: 57 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222
import io.fabric8.kubernetes.client.informers.cache.ItemStore;
2323
import io.fabric8.kubernetes.client.utils.Utils;
2424

25-
import java.util.ArrayList;
2625
import java.util.Collection;
2726
import java.util.Collections;
28-
import java.util.HashMap;
2927
import java.util.HashSet;
28+
import java.util.LinkedHashMap;
3029
import java.util.LinkedHashSet;
3130
import java.util.List;
3231
import java.util.Map;
32+
import java.util.Objects;
3333
import java.util.Optional;
3434
import java.util.Set;
3535
import java.util.concurrent.ConcurrentHashMap;
@@ -47,36 +47,39 @@
4747
*/
4848
public class CacheImpl<T extends HasMetadata> implements Cache<T> {
4949

50-
private static class Index {
51-
private Map<Object, Set<String>> values = new ConcurrentHashMap<Object, Set<String>>();
50+
private static class Index<T extends HasMetadata> {
51+
52+
private final Function<T, List<String>> indexer;
53+
private Map<Object, Map<String, String>> values = new ConcurrentHashMap<Object, Map<String, String>>();
54+
55+
public Index(Function<T, List<String>> indexer) {
56+
this.indexer = indexer;
57+
}
5258

53-
public void update(String indexKey, String key, boolean remove) {
59+
public void update(String indexKey, String key, String resourceVersion, boolean remove) {
5460
if (remove) {
5561
values.computeIfPresent(indexKey == null ? this : indexKey, (k, v) -> {
5662
v.remove(key);
5763
return v.isEmpty() ? null : v;
5864
});
5965
} else {
60-
values.computeIfAbsent(indexKey == null ? this : indexKey, k -> ConcurrentHashMap.newKeySet()).add(key);
66+
values.compute(indexKey == null ? this : indexKey, (k, v) -> v == null ? new ConcurrentHashMap<>() : v).put(key, resourceVersion);
6167
}
6268
}
6369

64-
public Set<String> get(String indexKey) {
65-
return values.getOrDefault(indexKey == null ? this : indexKey, Collections.emptySet());
70+
public Map<String, String> get(String indexKey) {
71+
return values.getOrDefault(indexKey == null ? this : indexKey, Map.of());
6672
}
6773
}
6874

6975
// NAMESPACE_INDEX is the default index function for caching objects
7076
public static final String NAMESPACE_INDEX = "namespace";
7177

72-
// indexers stores index functions by their names
73-
private final Map<String, Function<T, List<String>>> indexers = Collections.synchronizedMap(new HashMap<>());
74-
7578
// items stores object instances
7679
private ItemStore<T> items;
7780

7881
// indices stores objects' key by their indices
79-
private final ConcurrentMap<String, Index> indices = new ConcurrentHashMap<>();
82+
private final ConcurrentMap<String, Index<T>> indices = new ConcurrentHashMap<>();
8083

8184
public CacheImpl() {
8285
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
@@ -97,15 +100,13 @@ public void setItemStore(ItemStore<T> items) {
97100
* @return registered indexers
98101
*/
99102
@Override
100-
public Map<String, Function<T, List<String>>> getIndexers() {
101-
synchronized (indexers) {
102-
return Collections.unmodifiableMap(indexers);
103-
}
103+
public synchronized Map<String, Function<T, List<String>>> getIndexers() {
104+
return Collections.unmodifiableMap(indices.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().indexer)));
104105
}
105106

106107
@Override
107108
public synchronized void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
108-
Set<String> intersection = new HashSet<>(indexers.keySet());
109+
Set<String> intersection = new HashSet<>(indices.keySet());
109110
intersection.retainAll(indexersNew.keySet());
110111
if (!intersection.isEmpty()) {
111112
throw new IllegalArgumentException("Indexer conflict: " + intersection);
@@ -208,29 +209,17 @@ public T getByKey(String key) {
208209
*/
209210
@Override
210211
public List<T> index(String indexName, T obj) {
211-
Function<T, List<String>> indexFunc = this.indexers.get(indexName);
212-
if (indexFunc == null) {
213-
throw new IllegalArgumentException(String.format("index %s doesn't exist!", indexName));
214-
}
215-
Index index = getIndex(indexName);
216-
List<String> indexKeys = indexFunc.apply(obj);
217-
Set<String> returnKeySet = new HashSet<>();
212+
Index<T> index = getIndex(indexName);
213+
List<String> indexKeys = index.indexer.apply(obj);
214+
Map<String, T> result = new LinkedHashMap<>();
218215
for (String indexKey : indexKeys) {
219-
returnKeySet.addAll(index.get(indexKey));
216+
byIndex(indexName, indexKey, result);
220217
}
221218

222-
return getItems(returnKeySet);
223-
}
224-
225-
private List<T> getItems(Set<String> returnKeySet) {
226-
List<T> items = new ArrayList<>(returnKeySet.size());
227-
for (String absoluteKey : returnKeySet) {
228-
Optional.ofNullable(this.items.get(absoluteKey)).ifPresent(items::add);
229-
}
230-
return items;
219+
return List.copyOf(result.values());
231220
}
232221

233-
private Index getIndex(String indexName) {
222+
private Index<T> getIndex(String indexName) {
234223
return Optional.ofNullable(this.indices.get(indexName))
235224
.orElseThrow(() -> new IllegalArgumentException(String.format("index %s doesn't exist!", indexName)));
236225
}
@@ -244,8 +233,7 @@ private Index getIndex(String indexName) {
244233
*/
245234
@Override
246235
public List<String> indexKeys(String indexName, String indexKey) {
247-
Index index = getIndex(indexName);
248-
return new ArrayList<>(index.get(indexKey));
236+
return byIndex(indexName, indexKey).stream().map(this::getKey).collect(Collectors.toList());
249237
}
250238

251239
/**
@@ -257,8 +245,29 @@ public List<String> indexKeys(String indexName, String indexKey) {
257245
*/
258246
@Override
259247
public List<T> byIndex(String indexName, String indexKey) {
260-
Index index = getIndex(indexName);
261-
return getItems(index.get(indexKey));
248+
Map<String, T> result = new LinkedHashMap<>();
249+
byIndex(indexName, indexKey, result);
250+
return List.copyOf(result.values());
251+
}
252+
253+
private void byIndex(String indexName, String indexKey, Map<String, T> result) {
254+
Index<T> index = getIndex(indexName);
255+
Map<String, String> objs = index.get(indexKey);
256+
for (Map.Entry<String, String> entry : objs.entrySet()) {
257+
if (result.containsKey(entry.getKey())) {
258+
continue;
259+
}
260+
T item = this.items.get(entry.getKey());
261+
if (item != null) {
262+
if (!Objects.equals(item.getMetadata().getResourceVersion(), entry.getValue())) {
263+
List<String> values = index.indexer.apply(item);
264+
if (values == null || !values.contains(indexKey)) {
265+
continue; // out-of-date
266+
}
267+
}
268+
result.put(entry.getKey(), item);
269+
}
270+
}
262271
}
263272

264273
/**
@@ -271,26 +280,19 @@ public List<T> byIndex(String indexName, String indexKey) {
271280
* @param key the key
272281
*/
273282
private void updateIndices(T oldObj, T newObj, String key) {
274-
for (Map.Entry<String, Function<T, List<String>>> indexEntry : indexers.entrySet()) {
275-
String indexName = indexEntry.getKey();
276-
Function<T, List<String>> indexFunc = indexEntry.getValue();
277-
Index index = this.indices.get(indexName);
278-
if (index != null) {
279-
updateIndex(key, oldObj, newObj, indexFunc, index);
280-
}
281-
}
283+
indices.values().forEach(i -> updateIndex(key, oldObj, newObj, i));
282284
}
283285

284-
private void updateIndex(String key, T oldObj, T newObj, Function<T, List<String>> indexFunc, Index index) {
285-
List<String> oldValues = getIndexValues(oldObj, indexFunc);
286-
Collection<String> newIndexValues = new LinkedHashSet<>(getIndexValues(newObj, indexFunc));
286+
private void updateIndex(String key, T oldObj, T newObj, Index<T> index) {
287+
List<String> oldValues = getIndexValues(oldObj, index.indexer);
288+
Collection<String> newIndexValues = new LinkedHashSet<>(getIndexValues(newObj, index.indexer));
287289
for (String indexValue : oldValues) {
288-
if (!newIndexValues.remove(indexValue)) {
289-
index.update(indexValue, key, true);
290+
if (!newIndexValues.contains(indexValue)) {
291+
index.update(indexValue, key, null, true);
290292
}
291293
}
292294
for (String indexValue : newIndexValues) {
293-
index.update(indexValue, key, false);
295+
index.update(indexValue, key, newObj.getMetadata().getResourceVersion(), false);
294296
}
295297
}
296298

@@ -314,11 +316,10 @@ public synchronized CacheImpl<T> addIndexFunc(String indexName, Function<T, List
314316
if (this.indices.containsKey(indexName)) {
315317
throw new IllegalArgumentException("Indexer conflict: " + indexName);
316318
}
317-
Index index = new Index();
319+
Index<T> index = new Index<>(indexFunc);
318320
this.indices.put(indexName, index);
319-
this.indexers.put(indexName, indexFunc);
320321

321-
items.values().forEach(v -> updateIndex(getKey(v), null, v, indexFunc, index));
322+
items.values().forEach(v -> updateIndex(getKey(v), null, v, index));
322323
return this;
323324
}
324325

@@ -382,7 +383,6 @@ public static List<String> metaNamespaceIndexFunc(Object obj) {
382383
@Override
383384
public synchronized void removeIndexer(String name) {
384385
this.indices.remove(name);
385-
this.indexers.remove(name);
386386
}
387387

388388
public boolean isFullState() {

0 commit comments

Comments
 (0)