Skip to content

Commit 542fcd2

Browse files
Make runBackground use process IDs (#5115)
Currently the `runBackground` uses file locks and generated UUIDs for coordination. This sometimes leads to the cases where a new background process is launched while the old one is still running because the JVM seems to release file locks before network sockets. Because Mill now uses JDK 11, we can use it's APIs to transition to process id based watching, which should alleviate this problem. `0.12.x` branch backport: #5120 --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 6ca941d commit 542fcd2

File tree

8 files changed

+248
-66
lines changed

8 files changed

+248
-66
lines changed

.editorconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ end_of_line = lf
55
insert_final_newline = true
66
charset = utf-8
77
indent_style = space
8-
indent_size = 4
8+
indent_size = 2
99

1010
[*.scala]
1111
end_of_line = lf

core/define/src/mill/define/Task.scala

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,8 @@ object Task extends TaskBase {
124124
inline def Command[T](inline t: Result[T])(implicit
125125
inline w: Writer[T],
126126
inline ctx: ModuleCtx
127-
): Command[T] = ${ TaskMacros.commandImpl[T]('t)('w, 'ctx, exclusive = '{ false }) }
127+
): Command[T] =
128+
${ TaskMacros.commandImpl[T]('t)('w, 'ctx, exclusive = '{ false }, persistent = '{ false }) }
128129

129130
/**
130131
* @param exclusive Exclusive commands run serially at the end of an evaluation,
@@ -133,16 +134,20 @@ object Task extends TaskBase {
133134
* These are normally used for "top level" commands which are
134135
* run directly to perform some action or display some output
135136
* to the user.
137+
* @param persistent If true the `Task.dest` directory is not cleaned between
138+
* runs.
136139
*/
137140
def Command(
138141
t: NamedParameterOnlyDummy = new NamedParameterOnlyDummy,
139-
exclusive: Boolean = false
140-
): CommandFactory = new CommandFactory(exclusive)
141-
class CommandFactory private[mill] (val exclusive: Boolean) {
142+
exclusive: Boolean = false,
143+
persistent: Boolean = false
144+
): CommandFactory = new CommandFactory(exclusive = exclusive, persistent = persistent)
145+
class CommandFactory private[mill] (val exclusive: Boolean, val persistent: Boolean) {
142146
inline def apply[T](inline t: Result[T])(implicit
143147
inline w: Writer[T],
144148
inline ctx: ModuleCtx
145-
): Command[T] = ${ TaskMacros.commandImpl[T]('t)('w, 'ctx, '{ this.exclusive }) }
149+
): Command[T] =
150+
${ TaskMacros.commandImpl[T]('t)('w, 'ctx, '{ this.exclusive }, '{ this.persistent }) }
146151
}
147152

148153
/**
@@ -400,7 +405,8 @@ class Command[+T](
400405
val ctx0: mill.define.ModuleCtx,
401406
val writer: Writer[?],
402407
val isPrivate: Option[Boolean],
403-
val exclusive: Boolean
408+
val exclusive: Boolean,
409+
override val persistent: Boolean
404410
) extends NamedTask[T] {
405411

406412
override def asCommand: Some[Command[T]] = Some(this)
@@ -555,12 +561,21 @@ private object TaskMacros {
555561
)(t: Expr[Result[T]])(
556562
w: Expr[Writer[T]],
557563
ctx: Expr[mill.define.ModuleCtx],
558-
exclusive: Expr[Boolean]
564+
exclusive: Expr[Boolean],
565+
persistent: Expr[Boolean]
559566
): Expr[Command[T]] = {
560567
appImpl[Command, T](
561568
(in, ev) =>
562569
'{
563-
new Command[T]($in, $ev, $ctx, $w, ${ taskIsPrivate() }, exclusive = $exclusive)
570+
new Command[T](
571+
$in,
572+
$ev,
573+
$ctx,
574+
$w,
575+
${ taskIsPrivate() },
576+
exclusive = $exclusive,
577+
persistent = $persistent
578+
)
564579
},
565580
t
566581
)

integration/invalidation/run-background/src/RunBackgroundTests.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,32 @@ object RunBackgroundTests extends UtestIntegrationTestSuite {
3737
os.write(stop, "")
3838
eventually { probeLockAvailable(lock) }
3939
}
40+
41+
test("sequential") - integrationTest { tester =>
42+
// This test fails on Windows. We will need to look into it, but it also does not work on `main` branch, so it's
43+
// not a regression, thus we'll leave it for now.
44+
if (!scala.util.Properties.isWin) {
45+
import tester._
46+
val lock1 = os.temp()
47+
val lock2 = os.temp()
48+
val stop = os.temp()
49+
os.remove(stop)
50+
eval(("foo.runBackground", lock1, stop))
51+
eventually { !probeLockAvailable(lock1) }
52+
eval(("foo.runBackground", lock2, stop))
53+
eventually { !probeLockAvailable(lock2) }
54+
Predef.assert(
55+
probeLockAvailable(lock1),
56+
"first process should be exited after second process is running"
57+
)
58+
59+
if (tester.daemonMode) eval("shutdown")
60+
continually { !probeLockAvailable(lock2) }
61+
os.write(stop, "")
62+
eventually { probeLockAvailable(lock2) }
63+
}
64+
}
65+
4066
test("clean") - integrationTest { tester =>
4167
import tester._
4268
val lock = os.temp()

libs/pythonlib/src/mill/pythonlib/PythonModule.scala

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,8 @@ trait PythonModule extends PipModule with TaskModule { outer =>
175175
*
176176
* @see [[mainScript]]
177177
*/
178-
def runBackground(args: mill.define.Args) = Task.Command {
179-
val (procUuidPath, procLockfile, procUuid) = mill.scalalib.RunModule.backgroundSetup(Task.dest)
178+
def runBackground(args: mill.define.Args) = Task.Command(persistent = true) {
179+
val backgroundPaths = mill.scalalib.RunModule.BackgroundPaths(Task.dest)
180180
val pwd0 = os.Path(java.nio.file.Paths.get(".").toAbsolutePath)
181181

182182
mill.define.BuildCtx.withFilesystemCheckerDisabled {
@@ -185,11 +185,7 @@ trait PythonModule extends PipModule with TaskModule { outer =>
185185
classPath = mill.scalalib.JvmWorkerModule.backgroundWrapperClasspath().map(_.path).toSeq,
186186
jvmArgs = Nil,
187187
env = runnerEnvTask(),
188-
mainArgs = Seq(
189-
procUuidPath.toString,
190-
procLockfile.toString,
191-
procUuid,
192-
"500",
188+
mainArgs = backgroundPaths.toArgs ++ Seq(
193189
"<subprocess>",
194190
pythonExe().path.toString,
195191
mainScript().path.toString
Lines changed: 168 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,78 +1,213 @@
11
package mill.scalalib.backgroundwrapper;
22

3+
import java.io.IOException;
34
import java.io.RandomAccessFile;
45
import java.nio.channels.FileChannel;
5-
import java.nio.file.*;
6+
import java.nio.file.Files;
7+
import java.nio.file.Path;
8+
import java.nio.file.Paths;
9+
import java.nio.file.StandardOpenOption;
10+
import java.time.LocalDateTime;
11+
import java.time.format.DateTimeFormatter;
12+
import java.util.Arrays;
13+
import java.util.Optional;
614

15+
@SuppressWarnings("BusyWait")
716
public class MillBackgroundWrapper {
17+
private static final long NS_IN_S = 1_000_000_000;
18+
819
public static void main(String[] args) throws Exception {
9-
Path procUuidPath = Paths.get(args[0]);
10-
Path procLockfile = Paths.get(args[1]);
11-
String procUuid = args[2];
12-
int lockDelay = Integer.parseInt(args[3]);
20+
var newestProcessIdPath = Paths.get(args[0]);
21+
var currentlyRunningProccesIdPath = Paths.get(args[1]);
22+
var procLockfile = Paths.get(args[2]);
23+
var logPath = Paths.get(args[3]);
24+
var realMain = args[4];
25+
var realArgs = java.util.Arrays.copyOfRange(args, 5, args.length);
26+
27+
// The following code should handle this scenario, when we have 2 processes contending for the
28+
// lock.
29+
//
30+
// This can happen if a new MillBackgroundWrapper is launched while the previous one is still
31+
// running due to rapid
32+
// changes in the source code.
33+
//
34+
// Process 1 starts, writes newest_pid=1, claims lock, writes currently_running_pid = 1
35+
// Process 2 starts, writes newest_pid=2, tries to claim lock but is blocked
36+
// Process 3 starts at the same time as process 2, writes newest_pid=3, tries to claim lock but
37+
// is blocked
38+
//
39+
// Process 1 reads newest_pid=3, terminates, releases lock
40+
// Process 2 claims lock, reads currently_running_pid = 1, waits for process 1 to die, writes
41+
// currently_running_pid = 2
42+
// Process 2 reads newest_pid=3, terminates, releases lock
43+
// Process 3 claims lock, reads currently_running_pid = 2, waits for process 2 to die, writes
44+
// currently_running_pid = 3, then starts
45+
// Process 3 reads newest_pid=3, continues running
1346

14-
Files.writeString(procUuidPath, procUuid, StandardOpenOption.CREATE);
47+
// Indicate to the previous process that we want to take over.
48+
var myPid = ProcessHandle.current().pid();
49+
var myPidStr = "" + myPid;
50+
log(logPath, myPid, "Starting. Writing my PID to " + newestProcessIdPath);
51+
Files.writeString(
52+
newestProcessIdPath,
53+
myPidStr,
54+
StandardOpenOption.CREATE,
55+
StandardOpenOption.TRUNCATE_EXISTING);
1556

1657
// Take a lock on `procLockfile` to ensure that only one
1758
// `runBackground` process is running at any point in time.
59+
log(logPath, myPid, "Acquiring lock on " + procLockfile);
60+
61+
//noinspection resource - this is intentional, file is released when process dies.
1862
RandomAccessFile raf = new RandomAccessFile(procLockfile.toFile(), "rw");
1963
FileChannel chan = raf.getChannel();
2064
if (chan.tryLock() == null) {
21-
System.err.println("Waiting for runBackground lock to be available");
65+
var startTimeNanos = System.nanoTime();
66+
System.err.println("[mill:runBackground] Waiting for runBackground lock to be available");
67+
// this is intentional, lock is released when process dies.
68+
//noinspection ResultOfMethodCallIgnored
2269
chan.lock();
70+
var delta = (System.nanoTime() - startTimeNanos) / NS_IN_S;
71+
System.err.println("[mill:runBackground] Lock acquired after " + delta + "s");
2372
}
73+
log(logPath, myPid, "Lock acquired");
2474

25-
// For some reason even after the previous process exits things like sockets
26-
// may still take time to free, so sleep for a configurable duration before proceeding
27-
Thread.sleep(lockDelay);
75+
var oldProcessPid = readPreviousPid(currentlyRunningProccesIdPath);
76+
log(logPath, myPid, "Old process PID: " + oldProcessPid);
77+
oldProcessPid.ifPresent((oldPid) -> {
78+
var startTimeNanos = System.nanoTime();
79+
log(logPath, myPid, "Waiting for old process to terminate");
80+
var processExisted = waitForPreviousProcessToTerminate(oldPid);
81+
if (processExisted) {
82+
log(
83+
logPath,
84+
myPid,
85+
"Old process terminated in " + (System.nanoTime() - startTimeNanos) / NS_IN_S + "s");
86+
} else {
87+
log(logPath, myPid, "Old process was already terminated.");
88+
}
89+
});
90+
91+
log(logPath, myPid, "Writing my PID to " + currentlyRunningProccesIdPath);
92+
Files.writeString(
93+
currentlyRunningProccesIdPath,
94+
myPidStr,
95+
StandardOpenOption.CREATE,
96+
StandardOpenOption.TRUNCATE_EXISTING);
2897

2998
// Start the thread to watch for updates on the process marker file,
3099
// so we can exit if it is deleted or replaced
31-
long startTime = System.currentTimeMillis();
32-
Thread watcher = new Thread(() -> {
33-
while (true) {
34-
long delta = (System.currentTimeMillis() - startTime) / 1000;
35-
try {
36-
Thread.sleep(1);
37-
String token = Files.readString(procUuidPath);
38-
if (!token.equals(procUuid)) {
39-
System.err.println("runBackground exiting after " + delta + "s");
40-
System.exit(0);
41-
}
42-
} catch (Exception e) {
43-
System.err.println("runBackground exiting after " + delta + "s");
44-
System.exit(0);
45-
}
46-
}
100+
var startTimeNanos = System.nanoTime(); // use nanoTime as it is monotonic
101+
checkIfWeStillNeedToBeRunning(logPath, startTimeNanos, newestProcessIdPath, myPid);
102+
var watcher = new Thread(() -> {
103+
while (true)
104+
checkIfWeStillNeedToBeRunning(logPath, startTimeNanos, newestProcessIdPath, myPid);
47105
});
48-
49106
watcher.setDaemon(true);
50107
watcher.start();
51108

52109
// Actually start the Java main method we wanted to run in the background
53-
String realMain = args[4];
54-
String[] realArgs = java.util.Arrays.copyOfRange(args, 5, args.length);
55110
if (!realMain.equals("<subprocess>")) {
111+
log(
112+
logPath,
113+
myPid,
114+
"Running main method " + realMain + " with args " + Arrays.toString(realArgs));
56115
Class.forName(realMain).getMethod("main", String[].class).invoke(null, (Object) realArgs);
57116
} else {
117+
log(logPath, myPid, "Running subprocess with args " + Arrays.toString(realArgs));
58118
Process subprocess = new ProcessBuilder().command(realArgs).inheritIO().start();
119+
log(
120+
logPath,
121+
myPid,
122+
"Subprocess started with PID " + subprocess.pid() + ", waiting for it to exit");
59123

60124
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
125+
log(logPath, myPid, "Shutdown hook called, terminating subprocess");
61126
subprocess.destroy();
62127

63128
long now = System.currentTimeMillis();
64129

130+
// If the process does not shut down withing 100ms kill it forcibly.
65131
while (subprocess.isAlive() && System.currentTimeMillis() - now < 100) {
66132
try {
67133
Thread.sleep(1);
68134
} catch (InterruptedException e) {
69-
}
70-
if (subprocess.isAlive()) {
71-
subprocess.destroyForcibly();
135+
// do nothing
72136
}
73137
}
138+
if (subprocess.isAlive()) {
139+
log(logPath, myPid, "Forcing subprocess termination");
140+
subprocess.destroyForcibly();
141+
}
74142
}));
75-
System.exit(subprocess.waitFor());
143+
144+
log(logPath, myPid, "Waiting for subprocess to terminate");
145+
var exitCode = subprocess.waitFor();
146+
log(logPath, myPid, "Subprocess terminated with exit code " + exitCode);
147+
System.exit(exitCode);
148+
}
149+
}
150+
151+
private static void checkIfWeStillNeedToBeRunning(
152+
Path logPath, long startTimeNanos, Path newestProcessIdPath, long myPid) {
153+
var delta = (System.nanoTime() - startTimeNanos) / NS_IN_S;
154+
var myPidStr = "" + myPid;
155+
try {
156+
Thread.sleep(50);
157+
var token = Files.readString(newestProcessIdPath);
158+
if (!myPidStr.equals(token)) {
159+
log(
160+
logPath,
161+
myPid,
162+
"New process started, exiting. Token file (" + newestProcessIdPath + ") contents: \""
163+
+ token + "\"");
164+
System.err.println(
165+
"[mill:runBackground] Background process has been replaced with a new process (PID: "
166+
+ token + "), exiting after " + delta + "s");
167+
System.exit(0);
168+
}
169+
} catch (Exception e) {
170+
log(logPath, myPid, "Check if we still need to be running failed, exiting. Exception: " + e);
171+
System.err.println("[mill:runBackground] Background process exiting after " + delta + "s");
172+
System.exit(0);
173+
}
174+
}
175+
176+
static Optional<Long> readPreviousPid(Path pidFilePath) {
177+
try {
178+
var pidStr = Files.readString(pidFilePath);
179+
return Optional.of(Long.parseLong(pidStr));
180+
} catch (IOException | NumberFormatException e) {
181+
return Optional.empty();
182+
}
183+
}
184+
185+
/** Returns true if the process with the given PID has terminated, false if the process did not exist. */
186+
static boolean waitForPreviousProcessToTerminate(long pid) {
187+
var maybeOldProcess = ProcessHandle.of(pid);
188+
if (maybeOldProcess.isEmpty()) return false;
189+
var oldProcess = maybeOldProcess.get();
190+
191+
try {
192+
while (oldProcess.isAlive()) {
193+
Thread.sleep(50);
194+
}
195+
return true;
196+
} catch (InterruptedException e) {
197+
throw new RuntimeException(e);
198+
}
199+
}
200+
201+
static void log(Path logFile, long myPid, String message) {
202+
var timestamp = DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now());
203+
try {
204+
Files.writeString(
205+
logFile,
206+
"[" + timestamp + "] " + myPid + ": " + message + "\n",
207+
StandardOpenOption.CREATE,
208+
StandardOpenOption.APPEND);
209+
} catch (IOException e) {
210+
// do nothing
76211
}
77212
}
78213
}

0 commit comments

Comments
 (0)