Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ end_of_line = lf
insert_final_newline = true
charset = utf-8
indent_style = space
indent_size = 4
indent_size = 2

[*.scala]
end_of_line = lf
Expand Down
31 changes: 23 additions & 8 deletions core/define/src/mill/define/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ object Task extends TaskBase {
inline def Command[T](inline t: Result[T])(implicit
inline w: Writer[T],
inline ctx: ModuleCtx
): Command[T] = ${ TaskMacros.commandImpl[T]('t)('w, 'ctx, exclusive = '{ false }) }
): Command[T] =
${ TaskMacros.commandImpl[T]('t)('w, 'ctx, exclusive = '{ false }, persistent = '{ false }) }

/**
* @param exclusive Exclusive commands run serially at the end of an evaluation,
Expand All @@ -133,16 +134,20 @@ object Task extends TaskBase {
* These are normally used for "top level" commands which are
* run directly to perform some action or display some output
* to the user.
* @param persistent If true the `Task.dest` directory is not cleaned between
* runs.
*/
def Command(
t: NamedParameterOnlyDummy = new NamedParameterOnlyDummy,
exclusive: Boolean = false
): CommandFactory = new CommandFactory(exclusive)
class CommandFactory private[mill] (val exclusive: Boolean) {
exclusive: Boolean = false,
persistent: Boolean = false
): CommandFactory = new CommandFactory(exclusive = exclusive, persistent = persistent)
class CommandFactory private[mill] (val exclusive: Boolean, val persistent: Boolean) {
inline def apply[T](inline t: Result[T])(implicit
inline w: Writer[T],
inline ctx: ModuleCtx
): Command[T] = ${ TaskMacros.commandImpl[T]('t)('w, 'ctx, '{ this.exclusive }) }
): Command[T] =
${ TaskMacros.commandImpl[T]('t)('w, 'ctx, '{ this.exclusive }, '{ this.persistent }) }
}

/**
Expand Down Expand Up @@ -400,7 +405,8 @@ class Command[+T](
val ctx0: mill.define.ModuleCtx,
val writer: Writer[?],
val isPrivate: Option[Boolean],
val exclusive: Boolean
val exclusive: Boolean,
override val persistent: Boolean
) extends NamedTask[T] {

override def asCommand: Some[Command[T]] = Some(this)
Expand Down Expand Up @@ -555,12 +561,21 @@ private object TaskMacros {
)(t: Expr[Result[T]])(
w: Expr[Writer[T]],
ctx: Expr[mill.define.ModuleCtx],
exclusive: Expr[Boolean]
exclusive: Expr[Boolean],
persistent: Expr[Boolean]
): Expr[Command[T]] = {
appImpl[Command, T](
(in, ev) =>
'{
new Command[T]($in, $ev, $ctx, $w, ${ taskIsPrivate() }, exclusive = $exclusive)
new Command[T](
$in,
$ev,
$ctx,
$w,
${ taskIsPrivate() },
exclusive = $exclusive,
persistent = $persistent
)
},
t
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,32 @@ object RunBackgroundTests extends UtestIntegrationTestSuite {
os.write(stop, "")
eventually { probeLockAvailable(lock) }
}

test("sequential") - integrationTest { tester =>
// This test fails on Windows. We will need to look into it, but it also does not work on `main` branch, so it's
// not a regression, thus we'll leave it for now.
if (!scala.util.Properties.isWin) {
import tester._
val lock1 = os.temp()
val lock2 = os.temp()
val stop = os.temp()
os.remove(stop)
eval(("foo.runBackground", lock1, stop))
eventually { !probeLockAvailable(lock1) }
eval(("foo.runBackground", lock2, stop))
eventually { !probeLockAvailable(lock2) }
Predef.assert(
probeLockAvailable(lock1),
"first process should be exited after second process is running"
)

if (tester.daemonMode) eval("shutdown")
continually { !probeLockAvailable(lock2) }
os.write(stop, "")
eventually { probeLockAvailable(lock2) }
}
}

test("clean") - integrationTest { tester =>
import tester._
val lock = os.temp()
Expand Down
10 changes: 3 additions & 7 deletions libs/pythonlib/src/mill/pythonlib/PythonModule.scala
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ trait PythonModule extends PipModule with TaskModule { outer =>
*
* @see [[mainScript]]
*/
def runBackground(args: mill.define.Args) = Task.Command {
val (procUuidPath, procLockfile, procUuid) = mill.scalalib.RunModule.backgroundSetup(Task.dest)
def runBackground(args: mill.define.Args) = Task.Command(persistent = true) {
val backgroundPaths = mill.scalalib.RunModule.BackgroundPaths(Task.dest)
val pwd0 = os.Path(java.nio.file.Paths.get(".").toAbsolutePath)

mill.define.BuildCtx.withFilesystemCheckerDisabled {
Expand All @@ -185,11 +185,7 @@ trait PythonModule extends PipModule with TaskModule { outer =>
classPath = mill.scalalib.JvmWorkerModule.backgroundWrapperClasspath().map(_.path).toSeq,
jvmArgs = Nil,
env = runnerEnvTask(),
mainArgs = Seq(
procUuidPath.toString,
procLockfile.toString,
procUuid,
"500",
mainArgs = backgroundPaths.toArgs ++ Seq(
"<subprocess>",
pythonExe().path.toString,
mainScript().path.toString
Expand Down
Original file line number Diff line number Diff line change
@@ -1,78 +1,213 @@
package mill.scalalib.backgroundwrapper;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
import java.nio.file.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Arrays;
import java.util.Optional;

@SuppressWarnings("BusyWait")
public class MillBackgroundWrapper {
private static final long NS_IN_S = 1_000_000_000;

public static void main(String[] args) throws Exception {
Path procUuidPath = Paths.get(args[0]);
Path procLockfile = Paths.get(args[1]);
String procUuid = args[2];
int lockDelay = Integer.parseInt(args[3]);
var newestProcessIdPath = Paths.get(args[0]);
var currentlyRunningProccesIdPath = Paths.get(args[1]);
var procLockfile = Paths.get(args[2]);
var logPath = Paths.get(args[3]);
var realMain = args[4];
var realArgs = java.util.Arrays.copyOfRange(args, 5, args.length);

// The following code should handle this scenario, when we have 2 processes contending for the
// lock.
//
// This can happen if a new MillBackgroundWrapper is launched while the previous one is still
// running due to rapid
// changes in the source code.
//
// Process 1 starts, writes newest_pid=1, claims lock, writes currently_running_pid = 1
// Process 2 starts, writes newest_pid=2, tries to claim lock but is blocked
// Process 3 starts at the same time as process 2, writes newest_pid=3, tries to claim lock but
// is blocked
//
// Process 1 reads newest_pid=3, terminates, releases lock
// Process 2 claims lock, reads currently_running_pid = 1, waits for process 1 to die, writes
// currently_running_pid = 2
// Process 2 reads newest_pid=3, terminates, releases lock
// Process 3 claims lock, reads currently_running_pid = 2, waits for process 2 to die, writes
// currently_running_pid = 3, then starts
// Process 3 reads newest_pid=3, continues running

Files.writeString(procUuidPath, procUuid, StandardOpenOption.CREATE);
// Indicate to the previous process that we want to take over.
var myPid = ProcessHandle.current().pid();
var myPidStr = "" + myPid;
log(logPath, myPid, "Starting. Writing my PID to " + newestProcessIdPath);
Files.writeString(
newestProcessIdPath,
myPidStr,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);

// Take a lock on `procLockfile` to ensure that only one
// `runBackground` process is running at any point in time.
log(logPath, myPid, "Acquiring lock on " + procLockfile);

//noinspection resource - this is intentional, file is released when process dies.
RandomAccessFile raf = new RandomAccessFile(procLockfile.toFile(), "rw");
FileChannel chan = raf.getChannel();
if (chan.tryLock() == null) {
System.err.println("Waiting for runBackground lock to be available");
var startTimeNanos = System.nanoTime();
System.err.println("[mill:runBackground] Waiting for runBackground lock to be available");
// this is intentional, lock is released when process dies.
//noinspection ResultOfMethodCallIgnored
chan.lock();
var delta = (System.nanoTime() - startTimeNanos) / NS_IN_S;
System.err.println("[mill:runBackground] Lock acquired after " + delta + "s");
}
log(logPath, myPid, "Lock acquired");

// For some reason even after the previous process exits things like sockets
// may still take time to free, so sleep for a configurable duration before proceeding
Thread.sleep(lockDelay);
var oldProcessPid = readPreviousPid(currentlyRunningProccesIdPath);
log(logPath, myPid, "Old process PID: " + oldProcessPid);
oldProcessPid.ifPresent((oldPid) -> {
var startTimeNanos = System.nanoTime();
log(logPath, myPid, "Waiting for old process to terminate");
var processExisted = waitForPreviousProcessToTerminate(oldPid);
if (processExisted) {
log(
logPath,
myPid,
"Old process terminated in " + (System.nanoTime() - startTimeNanos) / NS_IN_S + "s");
} else {
log(logPath, myPid, "Old process was already terminated.");
}
});

log(logPath, myPid, "Writing my PID to " + currentlyRunningProccesIdPath);
Files.writeString(
currentlyRunningProccesIdPath,
myPidStr,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING);

// Start the thread to watch for updates on the process marker file,
// so we can exit if it is deleted or replaced
long startTime = System.currentTimeMillis();
Thread watcher = new Thread(() -> {
while (true) {
long delta = (System.currentTimeMillis() - startTime) / 1000;
try {
Thread.sleep(1);
String token = Files.readString(procUuidPath);
if (!token.equals(procUuid)) {
System.err.println("runBackground exiting after " + delta + "s");
System.exit(0);
}
} catch (Exception e) {
System.err.println("runBackground exiting after " + delta + "s");
System.exit(0);
}
}
var startTimeNanos = System.nanoTime(); // use nanoTime as it is monotonic
checkIfWeStillNeedToBeRunning(logPath, startTimeNanos, newestProcessIdPath, myPid);
var watcher = new Thread(() -> {
while (true)
checkIfWeStillNeedToBeRunning(logPath, startTimeNanos, newestProcessIdPath, myPid);
});

watcher.setDaemon(true);
watcher.start();

// Actually start the Java main method we wanted to run in the background
String realMain = args[4];
String[] realArgs = java.util.Arrays.copyOfRange(args, 5, args.length);
if (!realMain.equals("<subprocess>")) {
log(
logPath,
myPid,
"Running main method " + realMain + " with args " + Arrays.toString(realArgs));
Class.forName(realMain).getMethod("main", String[].class).invoke(null, (Object) realArgs);
} else {
log(logPath, myPid, "Running subprocess with args " + Arrays.toString(realArgs));
Process subprocess = new ProcessBuilder().command(realArgs).inheritIO().start();
log(
logPath,
myPid,
"Subprocess started with PID " + subprocess.pid() + ", waiting for it to exit");

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
log(logPath, myPid, "Shutdown hook called, terminating subprocess");
subprocess.destroy();

long now = System.currentTimeMillis();

// If the process does not shut down withing 100ms kill it forcibly.
while (subprocess.isAlive() && System.currentTimeMillis() - now < 100) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
}
if (subprocess.isAlive()) {
subprocess.destroyForcibly();
// do nothing
}
}
if (subprocess.isAlive()) {
log(logPath, myPid, "Forcing subprocess termination");
subprocess.destroyForcibly();
}
}));
System.exit(subprocess.waitFor());

log(logPath, myPid, "Waiting for subprocess to terminate");
var exitCode = subprocess.waitFor();
log(logPath, myPid, "Subprocess terminated with exit code " + exitCode);
System.exit(exitCode);
}
}

private static void checkIfWeStillNeedToBeRunning(
Path logPath, long startTimeNanos, Path newestProcessIdPath, long myPid) {
var delta = (System.nanoTime() - startTimeNanos) / NS_IN_S;
var myPidStr = "" + myPid;
try {
Thread.sleep(50);
var token = Files.readString(newestProcessIdPath);
if (!myPidStr.equals(token)) {
log(
logPath,
myPid,
"New process started, exiting. Token file (" + newestProcessIdPath + ") contents: \""
+ token + "\"");
System.err.println(
"[mill:runBackground] Background process has been replaced with a new process (PID: "
+ token + "), exiting after " + delta + "s");
System.exit(0);
}
} catch (Exception e) {
log(logPath, myPid, "Check if we still need to be running failed, exiting. Exception: " + e);
System.err.println("[mill:runBackground] Background process exiting after " + delta + "s");
System.exit(0);
}
}

static Optional<Long> readPreviousPid(Path pidFilePath) {
try {
var pidStr = Files.readString(pidFilePath);
return Optional.of(Long.parseLong(pidStr));
} catch (IOException | NumberFormatException e) {
return Optional.empty();
}
}

/** Returns true if the process with the given PID has terminated, false if the process did not exist. */
static boolean waitForPreviousProcessToTerminate(long pid) {
var maybeOldProcess = ProcessHandle.of(pid);
if (maybeOldProcess.isEmpty()) return false;
var oldProcess = maybeOldProcess.get();

try {
while (oldProcess.isAlive()) {
Thread.sleep(50);
}
return true;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

static void log(Path logFile, long myPid, String message) {
var timestamp = DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now());
try {
Files.writeString(
logFile,
"[" + timestamp + "] " + myPid + ": " + message + "\n",
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
} catch (IOException e) {
// do nothing
}
}
}
Loading