Skip to content

Commit 67349e8

Browse files
committed
atlas: fix potential for caching stale values
Cache values are now versioned and will be discarded if the index version doesn't match.
1 parent 50ca1c6 commit 67349e8

File tree

2 files changed

+75
-62
lines changed

2 files changed

+75
-62
lines changed

spectator-reg-atlas/src/main/java/com/netflix/spectator/atlas/impl/QueryIndex.java

Lines changed: 67 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.Set;
2929
import java.util.concurrent.ConcurrentHashMap;
3030
import java.util.concurrent.CopyOnWriteArraySet;
31+
import java.util.concurrent.atomic.AtomicLong;
3132
import java.util.function.BiConsumer;
3233
import java.util.function.Consumer;
3334
import java.util.function.Function;
@@ -41,13 +42,32 @@
4142
@SuppressWarnings("PMD.LinguisticNaming")
4243
public final class QueryIndex<T> {
4344

45+
public static class CacheValue<V> {
46+
47+
private final long version;
48+
private final List<QueryIndex<V>> indices;
49+
50+
public CacheValue(long version, List<QueryIndex<V>> indices) {
51+
this.version = version;
52+
this.indices = indices;
53+
}
54+
55+
public long version() {
56+
return version;
57+
}
58+
59+
public List<QueryIndex<V>> indices() {
60+
return indices;
61+
}
62+
}
63+
4464
/**
4565
* Supplier to create a new instance of a cache used for other checks. The default should
4666
* be fine for most uses, but heavy uses with many expressions and high throughput may
4767
* benefit from an alternate implementation.
4868
*/
4969
@FunctionalInterface
50-
public interface CacheSupplier<V> extends Supplier<Cache<String, List<QueryIndex<V>>>> {
70+
public interface CacheSupplier<V> extends Supplier<Cache<String, CacheValue<V>>> {
5171
}
5272

5373
/** Default supplier based on a simple LFU cache. */
@@ -60,7 +80,7 @@ public static class DefaultCacheSupplier<V> implements CacheSupplier<V> {
6080
}
6181

6282
@Override
63-
public Cache<String, List<QueryIndex<V>>> get() {
83+
public Cache<String, CacheValue<V>> get() {
6484
return Cache.lfu(registry, "QueryIndex", 100, 1000);
6585
}
6686
}
@@ -138,7 +158,8 @@ private static int compare(String k1, String k2) {
138158
// as much as possible.
139159
private final ConcurrentHashMap<Query.KeyQuery, QueryIndex<T>> otherChecks;
140160
private final PrefixTree otherChecksTree;
141-
private final Cache<String, List<QueryIndex<T>>> otherChecksCache;
161+
private final Cache<String, CacheValue<T>> otherChecksCache;
162+
private final AtomicLong otherChecksVersion;
142163

143164
// Index for :has queries
144165
private volatile QueryIndex<T> hasKeyIdx;
@@ -160,6 +181,7 @@ private QueryIndex(CacheSupplier<T> cacheSupplier, String key) {
160181
this.otherChecks = new ConcurrentHashMap<>();
161182
this.otherChecksTree = new PrefixTree();
162183
this.otherChecksCache = cacheSupplier.get();
184+
this.otherChecksVersion = new AtomicLong();
163185
this.hasKeyIdx = null;
164186
this.otherKeysIdx = null;
165187
this.missingKeysIdx = null;
@@ -238,7 +260,7 @@ private void add(List<Query.KeyQuery> queries, int i, T value) {
238260
QueryIndex<T> idx = otherChecks.computeIfAbsent(kq, id -> QueryIndex.empty(cacheSupplier));
239261
idx.add(queries, j, value);
240262
if (otherChecksTree.put(kq)) {
241-
otherChecksCache.clear();
263+
otherChecksVersion.incrementAndGet();
242264
}
243265

244266
// Not queries should match if the key is missing from the id, so they need to
@@ -325,7 +347,7 @@ private boolean remove(List<Query.KeyQuery> queries, int i, T value) {
325347
if (idx.isEmpty()) {
326348
otherChecks.remove(kq);
327349
if (otherChecksTree.remove(kq)) {
328-
otherChecksCache.clear();
350+
otherChecksVersion.incrementAndGet();
329351
}
330352
}
331353
}
@@ -351,6 +373,32 @@ private boolean remove(List<Query.KeyQuery> queries, int i, T value) {
351373
return result;
352374
}
353375

376+
/** Get cached matches for the value or compute a new one. */
377+
private List<QueryIndex<T>> otherChecksComputeIfAbsent(String value) {
378+
CacheValue<T> cacheValue = otherChecksCache.get(value);
379+
long version = otherChecksVersion.get();
380+
if (cacheValue != null && cacheValue.version == version) {
381+
// Cached value on consistent version of other checks, use the cached value
382+
return cacheValue.indices;
383+
} else if (otherChecks.isEmpty()) {
384+
// No other checks, use empty list
385+
return Collections.emptyList();
386+
} else {
387+
// Compute a new value
388+
List<QueryIndex<T>> tmp = new ArrayList<>();
389+
otherChecksTree.forEach(value, kq -> {
390+
if (kq instanceof Query.In || matches(kq, value)) {
391+
QueryIndex<T> idx = otherChecks.get(kq);
392+
if (idx != null) {
393+
tmp.add(idx);
394+
}
395+
}
396+
});
397+
otherChecksCache.put(value, new CacheValue<>(version, tmp));
398+
return tmp;
399+
}
400+
}
401+
354402
/**
355403
* Returns true if this index is empty and wouldn't match any ids.
356404
*/
@@ -393,7 +441,6 @@ public void forEachMatch(Id id, Consumer<T> consumer) {
393441
forEachMatch(id, 0, new DedupConsumer<>(consumer));
394442
}
395443

396-
@SuppressWarnings("PMD.NPathComplexity")
397444
private void forEachMatch(Id tags, int i, Consumer<T> consumer) {
398445
// Matches for this level
399446
matches.forEach(consumer);
@@ -419,30 +466,13 @@ private void forEachMatch(Id tags, int i, Consumer<T> consumer) {
419466
}
420467

421468
// Scan for matches with other conditions
422-
List<QueryIndex<T>> otherMatches = otherChecksCache.get(v);
423-
if (otherMatches == null) {
424-
// Avoid the list and cache allocations if there are no other checks at
425-
// this level
426-
if (!otherChecks.isEmpty()) {
427-
List<QueryIndex<T>> tmp = new ArrayList<>();
428-
otherChecksTree.forEach(v, kq -> {
429-
if (kq instanceof Query.In || kq.matches(v)) {
430-
QueryIndex<T> idx = otherChecks.get(kq);
431-
if (idx != null) {
432-
tmp.add(idx);
433-
idx.forEachMatch(tags, nextPos, consumer);
434-
}
435-
}
436-
});
437-
otherChecksCache.put(v, tmp);
438-
}
439-
} else {
440-
// Enhanced for loop typically results in iterator being allocated. Using
441-
// size/get avoids the allocation and has better throughput.
442-
final int n = otherMatches.size();
443-
for (int p = 0; p < n; ++p) {
444-
otherMatches.get(p).forEachMatch(tags, nextPos, consumer);
445-
}
469+
List<QueryIndex<T>> otherMatches = otherChecksComputeIfAbsent(v);
470+
471+
// Enhanced for loop typically results in iterator being allocated. Using
472+
// size/get avoids the allocation and has better throughput.
473+
final int n = otherMatches.size();
474+
for (int p = 0; p < n; ++p) {
475+
otherMatches.get(p).forEachMatch(tags, nextPos, consumer);
446476
}
447477

448478
// Check matches for has key
@@ -520,30 +550,13 @@ private void forEachMatchImpl(Function<String, String> tags, Consumer<T> consume
520550
}
521551

522552
// Scan for matches with other conditions
523-
List<QueryIndex<T>> otherMatches = otherChecksCache.get(v);
524-
if (otherMatches == null) {
525-
// Avoid the list and cache allocations if there are no other checks at
526-
// this level
527-
if (!otherChecks.isEmpty()) {
528-
List<QueryIndex<T>> tmp = new ArrayList<>();
529-
otherChecksTree.forEach(v, kq -> {
530-
if (kq instanceof Query.In || matches(kq, v)) {
531-
QueryIndex<T> idx = otherChecks.get(kq);
532-
if (idx != null) {
533-
tmp.add(idx);
534-
idx.forEachMatch(tags, consumer);
535-
}
536-
}
537-
});
538-
otherChecksCache.put(v, tmp);
539-
}
540-
} else {
541-
// Enhanced for loop typically results in iterator being allocated. Using
542-
// size/get avoids the allocation and has better throughput.
543-
final int n = otherMatches.size();
544-
for (int p = 0; p < n; ++p) {
545-
otherMatches.get(p).forEachMatch(tags, consumer);
546-
}
553+
List<QueryIndex<T>> otherMatches = otherChecksComputeIfAbsent(v);
554+
555+
// Enhanced for loop typically results in iterator being allocated. Using
556+
// size/get avoids the allocation and has better throughput.
557+
final int n = otherMatches.size();
558+
for (int p = 0; p < n; ++p) {
559+
otherMatches.get(p).forEachMatch(tags, consumer);
547560
}
548561

549562
// Check matches for has key

spectator-reg-atlas/src/test/java/com/netflix/spectator/atlas/impl/QueryIndexTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,27 +42,27 @@ public class QueryIndexTest {
4242

4343
private final QueryIndex.CacheSupplier<Query> cacheSupplier = new QueryIndex.CacheSupplier<Query>() {
4444
@Override
45-
public Cache<String, List<QueryIndex<Query>>> get() {
46-
return new Cache<String, List<QueryIndex<Query>>>() {
47-
private final Map<String, List<QueryIndex<Query>>> data = new HashMap<>();
45+
public Cache<String, QueryIndex.CacheValue<Query>> get() {
46+
return new Cache<String, QueryIndex.CacheValue<Query>>() {
47+
private final Map<String, QueryIndex.CacheValue<Query>> data = new HashMap<>();
4848

4949
@Override
50-
public List<QueryIndex<Query>> get(String key) {
50+
public QueryIndex.CacheValue<Query> get(String key) {
5151
return data.get(key);
5252
}
5353

5454
@Override
55-
public List<QueryIndex<Query>> peek(String key) {
55+
public QueryIndex.CacheValue<Query> peek(String key) {
5656
return null;
5757
}
5858

5959
@Override
60-
public void put(String key, List<QueryIndex<Query>> value) {
60+
public void put(String key, QueryIndex.CacheValue<Query> value) {
6161
data.put(key, value);
6262
}
6363

6464
@Override
65-
public List<QueryIndex<Query>> computeIfAbsent(String key, Function<String, List<QueryIndex<Query>>> f) {
65+
public QueryIndex.CacheValue<Query> computeIfAbsent(String key, Function<String, QueryIndex.CacheValue<Query>> f) {
6666
return data.computeIfAbsent(key, f);
6767
}
6868

@@ -77,7 +77,7 @@ public int size() {
7777
}
7878

7979
@Override
80-
public Map<String, List<QueryIndex<Query>>> asMap() {
80+
public Map<String, QueryIndex.CacheValue<Query>> asMap() {
8181
return new HashMap<>(data);
8282
}
8383
};

0 commit comments

Comments
 (0)