Skip to content

Commit 19d49dc

Browse files
Desel72claude
andauthored
fix(informer): use ReadWriteLock in CacheImpl to prevent index read inconsistency (#7558)
* fix(informer): use ReadWriteLock in CacheImpl to prevent index read inconsistency * fix: use specific import for ReentrantReadWriteLock in ProcessorStoreTest Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: refactor concurrency test to parameterized and add CHANGELOG entry - Replaced two nearly-identical test methods with a single parameterized test using @valuesource and a shared helper method - Added CHANGELOG entry for issue #7265 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: reduce PR to concurrency reproducer test with @disabled Revert ReadWriteLock changes to CacheImpl, ProcessorStore, and ProcessorStoreTest. Remove CHANGELOG entry. Keep CacheImplConcurrencyTest as a disabled reproducer for #7265, to be enabled once #7575 lands. * fix: assert thread completion in concurrency reproducer test Check return values of doneLatch.await() and executor.awaitTermination() so that a deadlock or hung thread fails the test instead of silently passing. --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent b673281 commit 19d49dc

File tree

1 file changed

+141
-0
lines changed

1 file changed

+141
-0
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright (C) 2015 Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.fabric8.kubernetes.client.informers.impl.cache;
17+
18+
import io.fabric8.kubernetes.api.model.Pod;
19+
import io.fabric8.kubernetes.api.model.PodBuilder;
20+
import io.fabric8.kubernetes.client.informers.cache.Cache;
21+
import org.junit.jupiter.api.Disabled;
22+
import org.junit.jupiter.params.ParameterizedTest;
23+
import org.junit.jupiter.params.provider.ValueSource;
24+
25+
import java.util.Collections;
26+
import java.util.List;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicBoolean;
32+
import java.util.concurrent.atomic.AtomicInteger;
33+
34+
import static org.assertj.core.api.Assertions.assertThat;
35+
36+
/**
37+
* Verifies that concurrent index reads never observe partially-updated index state.
38+
*
39+
* @see <a href="https://github.com/fabric8io/kubernetes-client/issues/7265">Issue 7265</a>
40+
*/
41+
@Disabled("https://github.com/fabric8io/kubernetes-client/issues/7265")
42+
class CacheImplConcurrencyTest {
43+
44+
private static final String LABEL_INDEX = "label-index";
45+
46+
@ParameterizedTest(name = "{0} should never miss object during concurrent updates")
47+
@ValueSource(strings = { "byIndex", "indexKeys" })
48+
void indexReadShouldNeverMissObjectDuringConcurrentUpdates(String readMethod) throws InterruptedException {
49+
CacheImpl<Pod> cache = new CacheImpl<>(Cache.NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::metaNamespaceKeyFunc);
50+
cache.addIndexFunc(LABEL_INDEX, pod -> Collections.singletonList(pod.getMetadata().getLabels().get("app")));
51+
52+
Pod pod = new PodBuilder()
53+
.withNewMetadata()
54+
.withName("test-pod")
55+
.withNamespace("default")
56+
.withResourceVersion("1")
57+
.addToLabels("app", "myapp")
58+
.endMetadata()
59+
.build();
60+
cache.put(pod);
61+
62+
int writerThreads = 4;
63+
int readerThreads = 8;
64+
int iterations = 5000;
65+
AtomicBoolean missDetected = new AtomicBoolean(false);
66+
AtomicInteger missCount = new AtomicInteger(0);
67+
CountDownLatch startLatch = new CountDownLatch(1);
68+
CountDownLatch doneLatch = new CountDownLatch(writerThreads + readerThreads);
69+
70+
ExecutorService executor = Executors.newFixedThreadPool(writerThreads + readerThreads);
71+
72+
// Writers: continuously update the pod (same index key "myapp") to trigger remove+re-add in updateIndices
73+
for (int w = 0; w < writerThreads; w++) {
74+
final int writerId = w;
75+
executor.submit(() -> {
76+
try {
77+
startLatch.await();
78+
for (int i = 0; i < iterations; i++) {
79+
Pod updated = new PodBuilder()
80+
.withNewMetadata()
81+
.withName("test-pod")
82+
.withNamespace("default")
83+
.withResourceVersion(String.valueOf(writerId * iterations + i + 2))
84+
.addToLabels("app", "myapp")
85+
.endMetadata()
86+
.build();
87+
cache.put(updated);
88+
}
89+
} catch (InterruptedException e) {
90+
Thread.currentThread().interrupt();
91+
} finally {
92+
doneLatch.countDown();
93+
}
94+
});
95+
}
96+
97+
// Readers: continuously query the index and verify the pod is always found
98+
for (int r = 0; r < readerThreads; r++) {
99+
executor.submit(() -> {
100+
try {
101+
startLatch.await();
102+
for (int i = 0; i < iterations; i++) {
103+
boolean empty = readIndex(cache, readMethod);
104+
if (empty) {
105+
missDetected.set(true);
106+
missCount.incrementAndGet();
107+
}
108+
}
109+
} catch (InterruptedException e) {
110+
Thread.currentThread().interrupt();
111+
} finally {
112+
doneLatch.countDown();
113+
}
114+
});
115+
}
116+
117+
// Start all threads simultaneously
118+
startLatch.countDown();
119+
assertThat(doneLatch.await(30, TimeUnit.SECONDS))
120+
.as("All threads should complete within timeout")
121+
.isTrue();
122+
executor.shutdown();
123+
assertThat(executor.awaitTermination(5, TimeUnit.SECONDS))
124+
.as("Executor should terminate within timeout")
125+
.isTrue();
126+
127+
assertThat(missDetected.get())
128+
.as("%s() should never return empty during concurrent updates, but missed %d times", readMethod, missCount.get())
129+
.isFalse();
130+
}
131+
132+
private boolean readIndex(CacheImpl<Pod> cache, String method) {
133+
if ("byIndex".equals(method)) {
134+
List<Pod> result = cache.byIndex(LABEL_INDEX, "myapp");
135+
return result.isEmpty();
136+
} else {
137+
List<String> keys = cache.indexKeys(LABEL_INDEX, "myapp");
138+
return keys.isEmpty();
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)