Skip to content

Commit c211223

Browse files
authored
Merge pull request #347 from dwnusbaum/infinite-loop-memory-leak
Prevent `StepExecutionIterator` from leaking memory in cases where a single processed execution has a stuck CPS VM thread
2 parents e986fb0 + b15fde4 commit c211223

File tree

3 files changed

+110
-26
lines changed

3 files changed

+110
-26
lines changed

pom.xml

+6
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,12 @@
8888
<groupId>org.jenkins-ci.plugins</groupId>
8989
<artifactId>scm-api</artifactId>
9090
</dependency>
91+
<dependency>
92+
<groupId>org.awaitility</groupId>
93+
<artifactId>awaitility</artifactId>
94+
<version>4.2.2</version>
95+
<scope>test</scope>
96+
</dependency>
9197
<dependency>
9298
<groupId>org.jenkins-ci.plugins.workflow</groupId>
9399
<artifactId>workflow-job</artifactId>

src/main/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionList.java

+16-15
Original file line numberDiff line numberDiff line change
@@ -253,24 +253,25 @@ public ListenableFuture<?> apply(final Function<StepExecution, Void> f) {
253253

254254
for (FlowExecution e : FlowExecutionList.get()) {
255255
ListenableFuture<List<StepExecution>> execs = e.getCurrentExecutions(false);
256-
all.add(execs);
257-
Futures.addCallback(execs,new FutureCallback<List<StepExecution>>() {
258-
@Override
259-
public void onSuccess(@NonNull List<StepExecution> result) {
260-
for (StepExecution e : result) {
261-
try {
262-
f.apply(e);
263-
} catch (RuntimeException x) {
264-
LOGGER.log(Level.WARNING, null, x);
265-
}
256+
// It is important that the combined future's return values do not reference the individual step
257+
// executions, so we use transform instead of addCallback. Otherwise, it is possible to leak references
258+
// to the WorkflowRun for each processed StepExecution in the case where a single live FlowExecution
259+
// has a stuck CpsVmExecutorService that prevents the getCurrentExecutions future from completing.
260+
ListenableFuture<Void> results = Futures.transform(execs, (List<StepExecution> result) -> {
261+
for (StepExecution se : result) {
262+
try {
263+
f.apply(se);
264+
} catch (RuntimeException x) {
265+
LOGGER.log(Level.WARNING, null, x);
266266
}
267267
}
268-
269-
@Override
270-
public void onFailure(@NonNull Throwable t) {
271-
LOGGER.log(Level.WARNING, null, t);
272-
}
268+
return null;
269+
}, MoreExecutors.directExecutor());
270+
ListenableFuture<Void> resultsWithWarningsLogged = Futures.catching(results, Throwable.class, t -> {
271+
LOGGER.log(Level.WARNING, null, t);
272+
return null;
273273
}, MoreExecutors.directExecutor());
274+
all.add(resultsWithWarningsLogged);
274275
}
275276

276277
return Futures.allAsList(all);

src/test/java/org/jenkinsci/plugins/workflow/flow/FlowExecutionListTest.java

+88-11
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424

2525
package org.jenkinsci.plugins.workflow.flow;
2626

27+
import static org.awaitility.Awaitility.await;
2728
import static org.hamcrest.MatcherAssert.assertThat;
2829
import static org.hamcrest.Matchers.containsString;
2930
import static org.hamcrest.Matchers.hasItem;
@@ -38,20 +39,22 @@
3839
import hudson.model.TaskListener;
3940
import hudson.model.queue.QueueTaskFuture;
4041
import java.io.Serializable;
41-
import java.time.Duration;
42-
import java.time.Instant;
42+
import java.lang.ref.WeakReference;
4343
import java.util.Collections;
44+
import java.util.HashMap;
45+
import java.util.Map;
4446
import java.util.Set;
45-
import java.util.function.Supplier;
47+
import java.util.concurrent.TimeUnit;
4648
import java.util.logging.Level;
47-
import org.hamcrest.Matcher;
49+
import jenkins.model.Jenkins;
4850
import org.jenkinsci.plugins.workflow.cps.CpsFlowDefinition;
4951
import org.jenkinsci.plugins.workflow.job.WorkflowJob;
5052
import org.jenkinsci.plugins.workflow.job.WorkflowRun;
5153
import org.jenkinsci.plugins.workflow.steps.Step;
5254
import org.jenkinsci.plugins.workflow.steps.StepContext;
5355
import org.jenkinsci.plugins.workflow.steps.StepDescriptor;
5456
import org.jenkinsci.plugins.workflow.steps.StepExecution;
57+
import org.jenkinsci.plugins.workflow.steps.StepExecutions;
5558
import org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep;
5659
import org.junit.ClassRule;
5760
import org.junit.Test;
@@ -60,6 +63,7 @@
6063
import org.jvnet.hudson.test.Issue;
6164
import org.jvnet.hudson.test.LoggerRule;
6265
import org.jvnet.hudson.test.JenkinsSessionRule;
66+
import org.jvnet.hudson.test.MemoryAssert;
6367
import org.jvnet.hudson.test.TestExtension;
6468
import org.kohsuke.stapler.DataBoundConstructor;
6569

@@ -132,7 +136,7 @@ public class FlowExecutionListTest {
132136
at org.jenkinsci.plugins.workflow.flow.FlowExecutionList$ItemListenerImpl.onLoaded(FlowExecutionList.java:175)
133137
at jenkins.model.Jenkins.<init>(Jenkins.java:1019)
134138
*/
135-
waitFor(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
139+
await().atMost(5, TimeUnit.SECONDS).until(logging::getMessages, hasItem(containsString("Will resume [org.jenkinsci.plugins.workflow.test.steps.SemaphoreStep")));
136140
WorkflowJob p = r.jenkins.getItemByFullName("p", WorkflowJob.class);
137141
SemaphoreStep.success("wait/1", null);
138142
WorkflowRun b = p.getBuildByNumber(1);
@@ -160,6 +164,34 @@ public class FlowExecutionListTest {
160164
});
161165
}
162166

167+
@Test public void stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck() throws Throwable {
168+
sessions.then(r -> {
169+
var notStuck = r.createProject(WorkflowJob.class, "not-stuck");
170+
notStuck.setDefinition(new CpsFlowDefinition("semaphore 'wait'", true));
171+
var notStuckBuild = notStuck.scheduleBuild2(0).waitForStart();
172+
SemaphoreStep.waitForStart("wait/1", notStuckBuild);
173+
WeakReference<Object> notStuckBuildRef = new WeakReference<>(notStuckBuild);
174+
// Create a Pipeline that runs a long-lived task on its CpsVmExecutorService, causing it to get stuck.
175+
var stuck = r.createProject(WorkflowJob.class, "stuck");
176+
stuck.setDefinition(new CpsFlowDefinition("blockSynchronously 'stuck'", false));
177+
var stuckBuild = stuck.scheduleBuild2(0).waitForStart();
178+
await().atMost(5, TimeUnit.SECONDS).until(() -> SynchronousBlockingStep.isStarted("stuck"));
179+
// Make FlowExecutionList$StepExecutionIteratorImpl.applyAll submit a task to the CpsVmExecutorService
180+
// for stuck #1 that will never complete, so the resulting future will never complete.
181+
StepExecution.applyAll(e -> null);
182+
// Let notStuckBuild complete and clean up all references.
183+
SemaphoreStep.success("wait/1", null);
184+
r.waitForCompletion(notStuckBuild);
185+
notStuckBuild = null; // Clear out the local variable in this thread.
186+
Jenkins.get().getQueue().clearLeftItems(); // Otherwise we'd have to wait 5 minutes for the cache to be cleared.
187+
// Make sure that the reference can be GC'd.
188+
MemoryAssert.assertGC(notStuckBuildRef, true);
189+
// Allow stuck #1 to complete so the test can be cleaned up promptly.
190+
SynchronousBlockingStep.unblock("stuck");
191+
r.waitForCompletion(stuckBuild);
192+
});
193+
}
194+
163195
public static class NonResumableStep extends Step implements Serializable {
164196
public static final long serialVersionUID = 1L;
165197
@DataBoundConstructor
@@ -198,14 +230,59 @@ public String getFunctionName() {
198230
}
199231

200232
/**
201-
* Wait up to 5 seconds for the given supplier to return a matching value.
233+
* Blocks the CPS VM thread synchronously (bad!) to test related problems.
202234
*/
203-
private static <T> void waitFor(Supplier<T> valueSupplier, Matcher<T> matcher) throws InterruptedException {
204-
Instant end = Instant.now().plus(Duration.ofSeconds(5));
205-
while (!matcher.matches(valueSupplier.get()) && Instant.now().isBefore(end)) {
206-
Thread.sleep(100L);
235+
public static class SynchronousBlockingStep extends Step implements Serializable {
236+
private static final long serialVersionUID = 1L;
237+
private static final Map<String, State> blocked = new HashMap<>();
238+
private final String id;
239+
240+
@DataBoundConstructor
241+
public SynchronousBlockingStep(String id) {
242+
this.id = id;
243+
if (blocked.put(id, State.NOT_STARTED) != null) {
244+
throw new IllegalArgumentException("Attempting to reuse ID: " + id);
245+
}
246+
}
247+
248+
@Override
249+
public StepExecution start(StepContext context) throws Exception {
250+
return StepExecutions.synchronous(context, c -> {
251+
blocked.put(id, State.BLOCKED);
252+
c.get(TaskListener.class).getLogger().println(id + " blocked");
253+
while (blocked.get(id) == State.BLOCKED) {
254+
Thread.sleep(100L);
255+
}
256+
c.get(TaskListener.class).getLogger().println(id + " unblocked ");
257+
return null;
258+
});
259+
}
260+
261+
public static boolean isStarted(String id) {
262+
var state = blocked.get(id);
263+
return state != null && state != State.NOT_STARTED;
264+
}
265+
266+
public static void unblock(String id) {
267+
blocked.put(id, State.UNBLOCKED);
268+
}
269+
270+
private enum State {
271+
NOT_STARTED,
272+
BLOCKED,
273+
UNBLOCKED,
274+
}
275+
276+
@TestExtension("stepExecutionIteratorDoesNotLeakBuildsWhenOneIsStuck") public static class DescriptorImpl extends StepDescriptor {
277+
@Override
278+
public Set<? extends Class<?>> getRequiredContext() {
279+
return Collections.singleton(TaskListener.class);
280+
}
281+
@Override
282+
public String getFunctionName() {
283+
return "blockSynchronously";
284+
}
207285
}
208-
assertThat("Matcher should have matched after 5s", valueSupplier.get(), matcher);
209286
}
210287

211288
}

0 commit comments

Comments
 (0)