Skip to content

Commit bec32ca

Browse files
committed
concurrent testing utils
1 parent 90af142 commit bec32ca

File tree

4 files changed

+972
-1
lines changed

4 files changed

+972
-1
lines changed

mug-concurrent-testing/pom.xml

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
2+
<modelVersion>4.0.0</modelVersion>
3+
<parent>
4+
<groupId>com.google.mug</groupId>
5+
<artifactId>mug-root</artifactId>
6+
<version>9.9.3-SNAPSHOT</version>
7+
</parent>
8+
<artifactId>mug-concurrent-testing</artifactId>
9+
10+
<build>
11+
<pluginManagement>
12+
<plugins>
13+
<plugin>
14+
<groupId>org.sonatype.central</groupId>
15+
<artifactId>central-publishing-maven-plugin</artifactId>
16+
<version>0.8.0</version> <!-- or latest -->
17+
<extensions>true</extensions>
18+
<configuration>
19+
<publishingServerId>ossrh</publishingServerId>
20+
<!-- optional but recommended in CI -->
21+
<autoPublish>true</autoPublish>
22+
<waitUntil>published</waitUntil>
23+
</configuration>
24+
</plugin>
25+
<plugin>
26+
<artifactId>maven-compiler-plugin</artifactId>
27+
<configuration>
28+
<annotationProcessorPaths>
29+
<path>
30+
<groupId>com.google.errorprone</groupId>
31+
<artifactId>error_prone_core</artifactId>
32+
<version>${error_prone.version}</version>
33+
</path>
34+
<path>
35+
<groupId>${project.groupId}</groupId>
36+
<artifactId>mug-errorprone</artifactId>
37+
<version>${project.version}</version>
38+
</path>
39+
<!-- Other annotation processors go here.
40+
If 'annotationProcessorPaths' is set, processors will no longer be
41+
discovered on the regular -classpath; see also 'Using Error Prone
42+
together with other annotation processors' below. -->
43+
</annotationProcessorPaths>
44+
</configuration>
45+
</plugin>
46+
</plugins>
47+
</pluginManagement>
48+
<plugins>
49+
<plugin>
50+
<groupId>org.apache.maven.plugins</groupId>
51+
<artifactId>maven-jar-plugin</artifactId>
52+
<configuration>
53+
<archive>
54+
<manifestEntries>
55+
<Automatic-Module-Name>com.google.mu.testing.concurrent</Automatic-Module-Name>
56+
</manifestEntries>
57+
</archive>
58+
</configuration>
59+
</plugin>
60+
</plugins>
61+
</build>
62+
<dependencies>
63+
<dependency>
64+
<groupId>${project.groupId}</groupId>
65+
<artifactId>mug</artifactId>
66+
<version>${project.version}</version>
67+
</dependency>
68+
<dependency>
69+
<groupId>com.google.errorprone</groupId>
70+
<artifactId>error_prone_annotations</artifactId>
71+
<version>${error_prone_annotations.version}</version>
72+
</dependency>
73+
<dependency>
74+
<groupId>com.google.truth</groupId>
75+
<artifactId>truth</artifactId>
76+
<scope>test</scope>
77+
</dependency>
78+
79+
<dependency>
80+
<groupId>com.google.truth.extensions</groupId>
81+
<artifactId>truth-java8-extension</artifactId>
82+
<scope>test</scope>
83+
</dependency>
84+
</dependencies>
85+
</project>
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
package com.google.mu.testing.concurrent;
2+
3+
import java.lang.invoke.MethodHandles;
4+
import java.lang.invoke.VarHandle;
5+
import java.util.ArrayList;
6+
import java.util.HashMap;
7+
import java.util.LinkedHashSet;
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.Set;
11+
12+
import com.google.errorprone.annotations.CanIgnoreReturnValue;
13+
import com.google.errorprone.annotations.FormatMethod;
14+
import com.google.errorprone.annotations.ThreadSafe;
15+
import com.google.mu.util.graph.Walker;
16+
import com.google.mu.util.stream.BiStream;
17+
18+
/**
19+
* A utility to manipulate temporal ordering (via {@link #checkpoint}) or happens-before (via {@link
20+
* #join}) relationships between events in concurrent operations. This is useful for testing, where
21+
* you want to ensure that certain actions are executed in a specific order.
22+
*
23+
* <p>Example:
24+
*
25+
* <pre>{@code
26+
* class MyConcurrentTest {
27+
* @Test
28+
* public void testConcurrent() {
29+
* var happens =
30+
* Happenstance.<String>builder()
31+
* .happenInOrder("writtenB", "readingA", "writtenA")
32+
* .happenInOrder("readingB", "writtenB")
33+
* .build();
34+
* Stream.of("A", "B")
35+
* .parallel()
36+
* .forEach(
37+
* input -> {
38+
* happens.join("reading" + input);
39+
* sut.read(input);
40+
* sut.write(input);
41+
* happens.join("written" + input);
42+
* dut.finish(input);
43+
* });
44+
* }
45+
* }
46+
* }</pre>
47+
*
48+
* <p>Implementation note: this class uses VarHandle instead of high-level synchronization
49+
* primitives to avoid introducing unintended memory barrier that may result in false negative tests
50+
* (the test would have failed without the sequence points). When waiting for predecessors, a
51+
* two-stage back-off strategy is employed: {@link Thread#onSpinWait} is called up to 1000 times to
52+
* catch tight visibility races in CPU-bound tests without triggering a context switch; if the
53+
* predecessor is still not ready, {@link Thread#yield} is called to prevent deadlocks or extreme
54+
* performance degradation in I/O-bound or heavily over-provisioned environments.
55+
*
56+
* <p>The {@link Builder#happenInOrder} method is intended to be called from the main thread to set
57+
* up the DAG of relationships between sequence points before the {@code checkpoint()} or {@code
58+
* join()} method is called from any threads.
59+
*
60+
* @param <K> the type of the sequence points
61+
* @since 9.9.3
62+
*/
63+
@ThreadSafe
64+
public final class Happenstance<K> {
65+
private static final VarHandle COMPLETED_STATUS_HANDLE =
66+
MethodHandles.arrayElementVarHandle(int[].class);
67+
private static final int SPIN_THRESHOLD = 1000;
68+
private final Map<K, Integer> pointToIndex;
69+
private final int[][] predecessors;
70+
private final int[] completedStatus; // completion counts
71+
72+
private Happenstance(Builder<K> builder) {
73+
this.pointToIndex = BiStream.from(builder.pointToIndex).toMap();
74+
this.predecessors =
75+
builder.predecessors.stream()
76+
.map(list -> list.stream().mapToInt(Integer::intValue).toArray())
77+
.toArray(int[][]::new);
78+
this.completedStatus = new int[builder.pointToIndex.size()];
79+
}
80+
81+
/** Returns a new {@link Builder}. */
82+
public static <K> Builder<K> builder() {
83+
return new Builder<>();
84+
}
85+
86+
/**
87+
* Builder for {@link Happenstance}.
88+
*
89+
* @param <K> the type of the sequence points
90+
*/
91+
public static final class Builder<K> {
92+
private final Map<K, Integer> pointToIndex = new HashMap<>();
93+
private final List<K> indexToPoint = new ArrayList<>();
94+
private final List<List<Integer>> predecessors = new ArrayList<>();
95+
private final List<Set<Integer>> successors = new ArrayList<>();
96+
97+
Builder() {}
98+
99+
/**
100+
* Defines a happens-before relationship between consecutive {@code sequencePoints}. For
101+
* example, {@code inOrder("A", "B", "C")} specifies that sequence points "A", "B", and "C" must
102+
* be completed in that order ("A" before "B", and "B" before "C").
103+
*
104+
* <p>This method should be called to define all sequence point orders before {@link #build} is
105+
* called.
106+
*
107+
* @throws IllegalArgumentException if adding an edge introduces a cycle in the dependency
108+
* graph.
109+
*/
110+
@CanIgnoreReturnValue
111+
public Builder<K> happenInOrder(K... sequencePoints) {
112+
for (K point : sequencePoints) {
113+
declareSequencePoint(point);
114+
}
115+
for (int i = 0; i < sequencePoints.length - 1; i++) {
116+
int u = pointToIndex.get(sequencePoints[i]);
117+
int v = pointToIndex.get(sequencePoints[i + 1]);
118+
if (u == v) {
119+
continue;
120+
}
121+
if (successors.get(u).add(v)) {
122+
checkArgument(
123+
!isReachable(v, u),
124+
"Adding edge %s -> %s creates a cycle",
125+
sequencePoints[i + 1],
126+
sequencePoints[i]);
127+
predecessors.get(v).add(u);
128+
}
129+
}
130+
return this;
131+
}
132+
133+
public Happenstance<K> build() {
134+
return new Happenstance<>(this);
135+
}
136+
137+
@CanIgnoreReturnValue
138+
private int declareSequencePoint(K id) {
139+
return pointToIndex.computeIfAbsent(
140+
id,
141+
k -> {
142+
int index = indexToPoint.size();
143+
indexToPoint.add(k);
144+
predecessors.add(new ArrayList<>());
145+
successors.add(new LinkedHashSet<>());
146+
return index;
147+
});
148+
}
149+
150+
private boolean isReachable(int from, int to) {
151+
return Walker.inGraph((Integer index) -> successors.get(index).stream())
152+
.breadthFirstFrom(from)
153+
.anyMatch(node -> node.intValue() == to);
154+
}
155+
}
156+
157+
/**
158+
* Joins until all predecessors of {@code sequencePoint} have checked in, then marks {@code
159+
* sequencePoint} as checked-in and returns.
160+
*
161+
* <p>This method differs from {@link #checkpoint} in that it establishes happens-before
162+
* relationship between sequence points, which means writes happening before {@code join(A)} are
163+
* visible to code after {@code join(B)} as long as {@code happenInOrder(A, B)} is specified.
164+
*
165+
* <p><em>Warning:</em>Using {@code join()} inappropriately may result in false negative tests if
166+
* the SUT has a bug that writes to non-volatile state, because the {@code join()} call will
167+
* accidentally "fix" the bug by making the write visible to other threads.
168+
*
169+
* @param sequencePoint the sequence point to wait for and mark as completed.
170+
* @throws IllegalArgumentException if {@code sequencePoint} wasn't defined via {@link
171+
* Builder#happenInOrder}.
172+
* @throws IllegalStateException if {@code sequencePoint} has already been marked as completed.
173+
*/
174+
public void join(K sequencePoint) {
175+
checkIn(sequencePoint, Ordering.HAPPENS_BEFORE);
176+
}
177+
178+
/**
179+
* Waits for all predecessors of {@code sequencePoint} to have checked in, then marks {@code
180+
* sequencePoint} as checked-in and returns.
181+
*
182+
* <p>To avoid introducing unintended memory barriers, this method only establishes temporal
183+
* ordering; no additional happens-before relationship between sequence points is established,
184+
* which means writes before the checkpoint A may still be invisible to reads after checkpoint B
185+
* even with {@code happenInOrder(A, B)}. The SUT itself should establish happens-before
186+
* relationship if necessary.
187+
*
188+
* <p>If extra memory barrier doesn't defeat your concurrency tests, and you need to establish
189+
* happens-before relationships, use {@link #join} instead.
190+
*
191+
* @param sequencePoint the sequence point to wait for and mark as completed.
192+
* @throws IllegalArgumentException if {@code sequencePoint} wasn't defined via {@link
193+
* Builder#happenInOrder}.
194+
* @throws IllegalStateException if {@code sequencePoint} has already been marked as completed.
195+
*/
196+
public void checkpoint(K sequencePoint) {
197+
checkIn(sequencePoint, Ordering.TEMPORAL);
198+
}
199+
200+
private void checkIn(K sequencePoint, Ordering ordering) {
201+
int index = uponSequencePoint(sequencePoint);
202+
int[] statuses = this.completedStatus;
203+
checkState(
204+
Ordering.TEMPORAL.read(statuses, index) == 0,
205+
"sequencePoint '%s' has already been checked in or joined.",
206+
sequencePoint);
207+
for (int predecessor : predecessors[index]) {
208+
for (int spins = 0; ordering.read(statuses, predecessor) == 0; spins++) {
209+
if (spins < SPIN_THRESHOLD) {
210+
Thread.onSpinWait();
211+
} else {
212+
Thread.yield();
213+
}
214+
}
215+
}
216+
ordering.write(statuses, index, 1);
217+
}
218+
219+
private int uponSequencePoint(K sequencePoint) {
220+
Integer index = pointToIndex.get(sequencePoint);
221+
checkArgument(
222+
index != null, "sequencePoint '%s' not defined in happenInOrder()", sequencePoint);
223+
return index;
224+
}
225+
226+
private enum Ordering {
227+
HAPPENS_BEFORE {
228+
@Override
229+
int read(int[] statuses, int index) {
230+
return (int) COMPLETED_STATUS_HANDLE.getAcquire(statuses, index);
231+
}
232+
233+
@Override
234+
void write(int[] statuses, int index, int value) {
235+
COMPLETED_STATUS_HANDLE.setRelease(statuses, index, value);
236+
}
237+
},
238+
TEMPORAL {
239+
@Override
240+
int read(int[] statuses, int index) {
241+
return (int) COMPLETED_STATUS_HANDLE.getOpaque(statuses, index);
242+
}
243+
244+
@Override
245+
void write(int[] statuses, int index, int value) {
246+
COMPLETED_STATUS_HANDLE.setOpaque(statuses, index, value);
247+
}
248+
};
249+
250+
abstract int read(int[] statuses, int index);
251+
252+
abstract void write(int[] statuses, int index, int value);
253+
}
254+
255+
@FormatMethod
256+
private static void checkArgument(boolean condition, String message, Object... args) {
257+
if (!condition) {
258+
throw new IllegalArgumentException(String.format(message, args));
259+
}
260+
}
261+
262+
@FormatMethod
263+
private static void checkState(boolean condition, String message, Object... args) {
264+
if (!condition) {
265+
throw new IllegalStateException(String.format(message, args));
266+
}
267+
}
268+
}

0 commit comments

Comments
 (0)