Skip to content

Commit 0833299

Browse files
authored
Allow to queue async periodic work (#10716)
2 parents e8ea2ff + b0b132d commit 0833299

File tree

2 files changed

+102
-1
lines changed

2 files changed

+102
-1
lines changed

core/src/main/java/hudson/model/AsyncPeriodicWork.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.io.PrintStream;
1010
import java.util.Date;
1111
import java.util.concurrent.TimeUnit;
12+
import java.util.concurrent.atomic.AtomicBoolean;
1213
import java.util.function.Supplier;
1314
import java.util.logging.Level;
1415
import java.util.logging.LogRecord;
@@ -73,6 +74,8 @@ public abstract class AsyncPeriodicWork extends PeriodicWork {
7374

7475
private Thread thread;
7576

77+
private final AtomicBoolean pending = new AtomicBoolean(false);
78+
7679
protected AsyncPeriodicWork(String name) {
7780
this.name = name;
7881
this.logRotateMillis = TimeUnit.MINUTES.toMillis(
@@ -88,7 +91,12 @@ protected AsyncPeriodicWork(String name) {
8891
public final void doRun() {
8992
try {
9093
if (thread != null && thread.isAlive()) {
91-
logger.log(this.getSlowLoggingLevel(), "{0} thread is still running. Execution aborted.", name);
94+
if (queueIfAlreadyRunning()) {
95+
logger.log(this.getSlowLoggingLevel(), "Scheduling another run of {0} since it is already running", name);
96+
pending.set(true);
97+
} else {
98+
logger.log(this.getSlowLoggingLevel(), "{0} thread is still running. Execution aborted.", name);
99+
}
92100
return;
93101
}
94102
thread = new Thread(() -> {
@@ -112,6 +120,11 @@ public final void doRun() {
112120

113121
logger.log(Level.FINE, "Finished {0}. {1,number} ms",
114122
new Object[]{name, stopTime - startTime});
123+
thread = null;
124+
if (pending.getAndSet(false)) {
125+
logger.log(this.getSlowLoggingLevel(), "An execution of {0} was requested while it was running, scheduling another run now", name);
126+
doRun();
127+
}
115128
}, name + " thread");
116129
thread.start();
117130
} catch (Throwable t) {
@@ -262,4 +275,12 @@ protected Level getErrorLoggingLevel() {
262275
* The caller will record the exception and moves on.
263276
*/
264277
protected abstract void execute(TaskListener listener) throws IOException, InterruptedException;
278+
279+
/**
280+
* @return true if a new run should be queued if it is already running while called.
281+
* @since TODO
282+
*/
283+
protected boolean queueIfAlreadyRunning() {
284+
return false;
285+
}
265286
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
package hudson.model;
2+
3+
import static org.awaitility.Awaitility.await;
4+
import static org.hamcrest.MatcherAssert.assertThat;
5+
import static org.hamcrest.Matchers.is;
6+
import static org.hamcrest.Matchers.lessThan;
7+
8+
import hudson.ExtensionList;
9+
import java.time.Duration;
10+
import java.util.concurrent.TimeUnit;
11+
import org.junit.Rule;
12+
import org.junit.Test;
13+
import org.jvnet.hudson.test.JenkinsRule;
14+
import org.jvnet.hudson.test.TestExtension;
15+
16+
public class AsyncPeriodicWorkTest {
17+
@Rule
18+
public JenkinsRule r = new JenkinsRule();
19+
20+
@Test
21+
public void extraCallGetsIgnored() {
22+
var instance = ExtensionList.lookupSingleton(AsyncPeriodicWorkTestImpl.class);
23+
assertThat(instance.getCount(), is(0));
24+
instance.run();
25+
// Gets ignored
26+
instance.run();
27+
await().until(instance::getCount, is(1));
28+
await("Should never reach 2 as the second call should be ignored").during(Duration.ofSeconds(2)).until(instance::getCount, lessThan(2));
29+
}
30+
31+
@Test
32+
public void extraCallGetsQueued() {
33+
var instance = ExtensionList.lookupSingleton(AsyncPeriodicWorkTestImpl.class);
34+
instance.setQueueIfAlreadyRunning(true);
35+
assertThat(instance.getCount(), is(0));
36+
instance.run();
37+
// Gets queued
38+
instance.run();
39+
await("The second call has been queued and executed later").until(instance::getCount, is(2));
40+
}
41+
42+
@TestExtension
43+
public static class AsyncPeriodicWorkTestImpl extends AsyncPeriodicWork {
44+
private boolean queueIfAlreadyRunning;
45+
private int count = 0;
46+
47+
public AsyncPeriodicWorkTestImpl() {
48+
super(AsyncPeriodicWorkTestImpl.class.getSimpleName());
49+
}
50+
51+
@Override
52+
protected void execute(TaskListener listener) throws InterruptedException {
53+
count++;
54+
Thread.sleep(100);
55+
}
56+
57+
public int getCount() {
58+
return count;
59+
}
60+
61+
@Override
62+
public long getRecurrencePeriod() {
63+
return TimeUnit.DAYS.toMillis(1);
64+
}
65+
66+
@Override
67+
public long getInitialDelay() {
68+
return TimeUnit.DAYS.toMillis(1);
69+
}
70+
71+
public void setQueueIfAlreadyRunning(boolean queueIfAlreadyRunning) {
72+
this.queueIfAlreadyRunning = queueIfAlreadyRunning;
73+
}
74+
75+
@Override
76+
protected boolean queueIfAlreadyRunning() {
77+
return queueIfAlreadyRunning;
78+
}
79+
}
80+
}

0 commit comments

Comments
 (0)