diff --git a/core/api/src/mill/api/Watchable.scala b/core/api/src/mill/api/Watchable.scala index c137c36eba9..b47d99bef5a 100644 --- a/core/api/src/mill/api/Watchable.scala +++ b/core/api/src/mill/api/Watchable.scala @@ -8,6 +8,24 @@ package mill.api */ private[mill] sealed trait Watchable private[mill] object Watchable { - case class Path(p: java.nio.file.Path, quick: Boolean, signature: Int) extends Watchable - case class Value(f: () => Long, signature: Long, pretty: String) extends Watchable + + /** A [[Watchable]] that is being watched via polling. */ + private[mill] sealed trait Pollable extends Watchable + + /** A [[Watchable]] that is being watched via a notification system (like inotify). */ + private[mill] sealed trait Notifiable extends Watchable + + /** + * @param p the path to watch + * @param quick if true, only watch file attributes + * @param signature the initial hash of the path contents + */ + case class Path(p: java.nio.file.Path, quick: Boolean, signature: Int) extends Notifiable + + /** + * @param f the expression to watch, returns some sort of hash + * @param signature the initial hash from the first invocation of the expression + * @param pretty human-readable name + */ + case class Value(f: () => Long, signature: Long, pretty: String) extends Pollable } diff --git a/integration/invalidation/watch-source-input/src/WatchSourceInputTests.scala b/integration/invalidation/watch-source-input/src/WatchSourceInputTests.scala index 15d6efb974d..610d41c354a 100644 --- a/integration/invalidation/watch-source-input/src/WatchSourceInputTests.scala +++ b/integration/invalidation/watch-source-input/src/WatchSourceInputTests.scala @@ -44,12 +44,9 @@ trait WatchTests extends UtestIntegrationTestSuite { val expectedShows0 = mutable.Buffer.empty[String] val res = f(expectedOut, expectedErr, expectedShows0) val (shows, out) = res.out.linesIterator.toVector.partition(_.startsWith("\"")) - val err = res.err.linesIterator.toVector - .filter(!_.contains("Compiling compiler interface...")) - .filter(!_.contains("Watching for changes")) - .filter(!_.contains("[info] compiling")) - .filter(!_.contains("[info] done compiling")) - .filter(!_.contains("mill-server/ exitCode file not found")) + val err = res.err.linesIterator.toVector.filter(s => + s.startsWith("Setting up ") || s.startsWith("Running ") + ) assert(out == expectedOut) diff --git a/libs/main/src/mill/main/MainModule.scala b/libs/main/src/mill/main/MainModule.scala index 9bad10aeee2..fb4c7fd0e4e 100644 --- a/libs/main/src/mill/main/MainModule.scala +++ b/libs/main/src/mill/main/MainModule.scala @@ -22,8 +22,8 @@ abstract class MainRootModule()(implicit * [[show]], [[inspect]], [[plan]], etc. */ trait MainModule extends BaseModule with MainModuleApi { - protected[mill] val watchedValues: mutable.Buffer[Watchable] = mutable.Buffer.empty[Watchable] - protected[mill] val evalWatchedValues: mutable.Buffer[Watchable] = mutable.Buffer.empty[Watchable] + protected[mill] val watchedValues: mutable.Buffer[Watchable] = mutable.Buffer.empty + protected[mill] val evalWatchedValues: mutable.Buffer[Watchable] = mutable.Buffer.empty object interp { def watchValue[T](v0: => T)(implicit fn: sourcecode.FileName, ln: sourcecode.Line): T = { os.checker.withValue(os.Checker.Nop) { diff --git a/mill-build/src/millbuild/Deps.scala b/mill-build/src/millbuild/Deps.scala index c5634a02e17..54df02061de 100644 --- a/mill-build/src/millbuild/Deps.scala +++ b/mill-build/src/millbuild/Deps.scala @@ -117,7 +117,9 @@ object Deps { val junitInterface = mvn"com.github.sbt:junit-interface:0.13.3" val commonsIo = mvn"commons-io:commons-io:2.18.0" val log4j2Core = mvn"org.apache.logging.log4j:log4j-core:2.24.3" - val osLib = mvn"com.lihaoyi::os-lib:0.11.5-M8" + val osLibVersion = "0.11.5-M8" + val osLib = mvn"com.lihaoyi::os-lib:$osLibVersion" + val osLibWatch = mvn"com.lihaoyi::os-lib-watch:$osLibVersion" val pprint = mvn"com.lihaoyi::pprint:0.9.0" val mainargs = mvn"com.lihaoyi::mainargs:0.7.6" val millModuledefsVersion = "0.11.4" diff --git a/runner/daemon/package.mill b/runner/daemon/package.mill index fc48dd8d71c..32f51867d61 100644 --- a/runner/daemon/package.mill +++ b/runner/daemon/package.mill @@ -19,6 +19,7 @@ object `package` extends MillPublishScalaModule { def mvnDeps = Seq( Deps.sourcecode, Deps.osLib, + Deps.osLibWatch, Deps.mainargs, Deps.upickle, Deps.pprint, diff --git a/runner/daemon/src/mill/daemon/MillBuildBootstrap.scala b/runner/daemon/src/mill/daemon/MillBuildBootstrap.scala index a9d7078837c..1c5689ae5b6 100644 --- a/runner/daemon/src/mill/daemon/MillBuildBootstrap.scala +++ b/runner/daemon/src/mill/daemon/MillBuildBootstrap.scala @@ -280,7 +280,7 @@ class MillBuildBootstrap( // look at the `moduleWatched` of one frame up (`prevOuterFrameOpt`), // and not the `moduleWatched` from the current frame (`prevFrameOpt`) val moduleWatchChanged = - prevOuterFrameOpt.exists(_.moduleWatched.exists(w => !Watching.validate(w))) + prevOuterFrameOpt.exists(_.moduleWatched.exists(w => !Watching.validateAnyWatchable(w))) val classLoader = if (runClasspathChanged || moduleWatchChanged) { // Make sure we close the old classloader every time we create a new diff --git a/runner/daemon/src/mill/daemon/MillCliConfig.scala b/runner/daemon/src/mill/daemon/MillCliConfig.scala index e72d0b2d606..38ee92167e6 100644 --- a/runner/daemon/src/mill/daemon/MillCliConfig.scala +++ b/runner/daemon/src/mill/daemon/MillCliConfig.scala @@ -97,6 +97,11 @@ case class MillCliConfig( doc = """Watch and re-run the given tasks when when their inputs change.""" ) watch: Flag = Flag(), + @arg( + name = "watch-via-fs-notify", + doc = "Use filesystem based file watching instead of polling based one (defaults to true)." + ) + watchViaFsNotify: Boolean = true, @arg( short = 's', doc = diff --git a/runner/daemon/src/mill/daemon/MillMain.scala b/runner/daemon/src/mill/daemon/MillMain.scala index 928faa3893b..c7a0c834770 100644 --- a/runner/daemon/src/mill/daemon/MillMain.scala +++ b/runner/daemon/src/mill/daemon/MillMain.scala @@ -350,9 +350,13 @@ object MillMain { if (config.watch.value) os.remove(out / OutFiles.millSelectiveExecution) Watching.watchLoop( ringBell = config.ringBell.value, - watch = config.watch.value, + watch = Option.when(config.watch.value)(Watching.WatchArgs( + setIdle = setIdle, + colors, + useNotify = config.watchViaFsNotify, + serverDir = serverDir + )), streams = streams, - setIdle = setIdle, evaluate = (enterKeyPressed: Boolean, prevState: Option[RunnerState]) => { adjustJvmProperties(userSpecifiedProperties, initialSystemProperties) runMillBootstrap( @@ -361,8 +365,7 @@ object MillMain { config.leftoverArgs.value, streams ) - }, - colors = colors + } ) } } diff --git a/runner/daemon/src/mill/daemon/Watching.scala b/runner/daemon/src/mill/daemon/Watching.scala index d6ec0135108..18125c5431f 100644 --- a/runner/daemon/src/mill/daemon/Watching.scala +++ b/runner/daemon/src/mill/daemon/Watching.scala @@ -2,12 +2,14 @@ package mill.daemon import mill.api.SystemStreams import mill.api.internal.internal -import mill.define.PathRef import mill.define.internal.Watchable +import mill.define.{PathRef, WorkspaceRoot} import mill.internal.Colors import java.io.InputStream +import java.nio.channels.ClosedChannelException import scala.annotation.tailrec +import scala.util.Using /** * Logic around the "watch and wait" functionality in Mill: re-run on change, @@ -17,72 +19,209 @@ import scala.annotation.tailrec object Watching { case class Result[T](watched: Seq[Watchable], error: Option[String], result: T) + trait Evaluate[T] { + def apply(enterKeyPressed: Boolean, previousState: Option[T]): Result[T] + } + + /** + * @param useNotify whether to use filesystem based watcher. If it is false uses polling. + * @param serverDir the directory for storing logs of the mill server + */ + case class WatchArgs( + setIdle: Boolean => Unit, + colors: Colors, + useNotify: Boolean, + serverDir: os.Path + ) + + /** + * @param ringBell whether to emit bells + * @param watch if [[None]] just runs once and returns + */ def watchLoop[T]( ringBell: Boolean, - watch: Boolean, + watch: Option[WatchArgs], streams: SystemStreams, - setIdle: Boolean => Unit, - evaluate: (Boolean, Option[T]) => Result[T], - colors: Colors + evaluate: Evaluate[T] ): (Boolean, T) = { - var prevState: Option[T] = None - var enterKeyPressed = false - while (true) { - val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed, prevState) - prevState = Some(result) + def handleError(errorOpt: Option[String]): Unit = { errorOpt.foreach(streams.err.println) - if (ringBell) { - if (errorOpt.isEmpty) println("\u0007") - else { - println("\u0007") - Thread.sleep(250) - println("\u0007") - } - } + doRingBell(hasError = errorOpt.isDefined) + } - if (!watch) { - return (errorOpt.isEmpty, result) - } + def doRingBell(hasError: Boolean): Unit = { + if (!ringBell) return - val alreadyStale = watchables.exists(w => !validate(w)) - enterKeyPressed = false - if (!alreadyStale) { - enterKeyPressed = Watching.watchAndWait(streams, setIdle, streams.in, watchables, colors) + println("\u0007") + if (hasError) { + // If we have an error ring the bell again + Thread.sleep(250) + println("\u0007") } } - ??? + + watch match { + case None => + val Result(watchables, errorOpt, result) = + evaluate(enterKeyPressed = false, previousState = None) + handleError(errorOpt) + (errorOpt.isEmpty, result) + + case Some(watchArgs) => + var prevState: Option[T] = None + var enterKeyPressed = false + + // Exits when the thread gets interruped. + while (true) { + val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed, prevState) + prevState = Some(result) + handleError(errorOpt) + + try { + watchArgs.setIdle(true) + enterKeyPressed = watchAndWait(streams, streams.in, watchables, watchArgs) + } finally { + watchArgs.setIdle(false) + } + } + throw new IllegalStateException("unreachable") + } } - def watchAndWait( + private def watchAndWait( streams: SystemStreams, - setIdle: Boolean => Unit, stdin: InputStream, watched: Seq[Watchable], - colors: Colors + watchArgs: WatchArgs ): Boolean = { - setIdle(true) - val watchedPaths = watched.collect { case p: Watchable.Path => p.p } - val watchedValues = watched.size - watchedPaths.size + val (watchedPollables, watchedPathsSeq) = watched.partitionMap { + case w: Watchable.Pollable => Left(w) + case p: Watchable.Path => Right(p) + } + val watchedPathsSet = watchedPathsSeq.iterator.map(p => os.Path(p.p)).toSet + val watchedValueCount = watched.size - watchedPathsSeq.size - val watchedValueStr = if (watchedValues == 0) "" else s" and $watchedValues other values" + val watchedValueStr = + if (watchedValueCount == 0) "" else s" and $watchedValueCount other values" - streams.err.println( - colors.info( - s"Watching for changes to ${watchedPaths.size} paths$watchedValueStr... (Enter to re-run, Ctrl-C to exit)" + streams.err.println { + val viaFsNotify = if (watchArgs.useNotify) " (via fsnotify)" else "" + watchArgs.colors.info( + s"Watching for changes to ${watchedPathsSeq.size} paths$viaFsNotify$watchedValueStr... (Enter to re-run, Ctrl-C to exit)" ).toString - ) + } + + def doWatch(notifiablesChanged: () => Boolean) = { + val enterKeyPressed = statWatchWait(watchedPollables, stdin, notifiablesChanged) + enterKeyPressed + } - val enterKeyPressed = statWatchWait(watched, stdin) - setIdle(false) - enterKeyPressed + def doWatchPolling() = + doWatch(notifiablesChanged = () => watchedPathsSeq.exists(p => !validateAnyWatchable(p))) + + def doWatchFsNotify() = { + Using.resource(os.write.outputStream(watchArgs.serverDir / "fsNotifyWatchLog")) { watchLog => + def writeToWatchLog(s: String): Unit = { + try { + watchLog.write(s.getBytes(java.nio.charset.StandardCharsets.UTF_8)) + watchLog.write('\n') + } catch { + case _: ClosedChannelException => /* do nothing, the file is already closed */ + } + } + + @volatile var pathChangesDetected = false + + // oslib watch only works with folders, so we have to watch the parent folders instead + + writeToWatchLog( + s"[watched-paths:unfiltered] ${watchedPathsSet.toSeq.sorted.mkString("\n")}" + ) + + val workspaceRoot = WorkspaceRoot.workspaceRoot + + /** Paths that are descendants of [[workspaceRoot]]. */ + val pathsUnderWorkspaceRoot = watchedPathsSet.filter { path => + val isUnderWorkspaceRoot = path.startsWith(workspaceRoot) + if (!isUnderWorkspaceRoot) { + streams.err.println(watchArgs.colors.error( + s"Watched path $path is outside workspace root $workspaceRoot, this is unsupported." + ).toString()) + } + + isUnderWorkspaceRoot + } + + // If I have 'root/a/b/c' + // + // Then I want to watch: + // root/a/b/c + // root/a/b + // root/a + // root + val filterPaths = pathsUnderWorkspaceRoot.flatMap { path => + path.relativeTo(workspaceRoot).segments.inits.map(segments => workspaceRoot / segments) + } + writeToWatchLog(s"[watched-paths:filtered] ${filterPaths.toSeq.sorted.mkString("\n")}") + + Using.resource(os.watch.watch( + // Just watch the root folder + Seq(workspaceRoot), + filter = path => { + val shouldBeWatched = + filterPaths.contains(path) || watchedPathsSet.exists(watchedPath => + path.startsWith(watchedPath) + ) + writeToWatchLog(s"[filter] (shouldBeWatched=$shouldBeWatched) $path") + shouldBeWatched + }, + onEvent = changedPaths => { + // Make sure that the changed paths are actually the ones in our watch list and not some adjacent files in the + // same folder + val hasWatchedPath = + changedPaths.exists(p => + watchedPathsSet.exists(watchedPath => p.startsWith(watchedPath)) + ) + writeToWatchLog( + s"[changed-paths] (hasWatchedPath=$hasWatchedPath) ${changedPaths.mkString("\n")}" + ) + if (hasWatchedPath) { + pathChangesDetected = true + } + }, + logger = (eventType, data) => + writeToWatchLog(s"[watch:event] $eventType: ${pprint.apply(data).plainText}") + )) { _ => + // If already stale, re-evaluate instantly. + // + // We need to do this to prevent any changes from slipping through the gap between the last evaluation and + // starting the watch. + val alreadyStale = watched.exists(w => !validateAnyWatchable(w)) + + if (alreadyStale) false + else doWatch(notifiablesChanged = () => pathChangesDetected) + } + } + } + + if (watchArgs.useNotify) doWatchFsNotify() + else doWatchPolling() } - // Returns `true` if enter key is pressed to re-run tasks explicitly - def statWatchWait(watched: Seq[Watchable], stdin: InputStream): Boolean = { + /** + * @param notifiablesChanged returns true if any of the notifiables have changed + * + * @return `true` if enter key is pressed to re-run tasks explicitly, false if changes in watched files occured. + */ + def statWatchWait( + watched: Seq[Watchable.Pollable], + stdin: InputStream, + notifiablesChanged: () => Boolean + ): Boolean = { val buffer = new Array[Byte](4 * 1024) @tailrec def statWatchWait0(): Boolean = { - if (watched.forall(w => validate(w))) { + if (!notifiablesChanged() && watched.forall(w => validate(w))) { if (lookForEnterKey()) { true } else { @@ -96,11 +235,13 @@ object Watching { if (stdin.available() == 0) false else stdin.read(buffer) match { case 0 | -1 => false - case n => + case bytesRead => buffer.indexOf('\n') match { case -1 => lookForEnterKey() - case i => - if (i >= n) lookForEnterKey() + case index => + // If we found the newline further than the bytes read, that means it's not from this read and thus we + // should try reading again. + if (index >= bytesRead) lookForEnterKey() else true } } @@ -109,13 +250,23 @@ object Watching { statWatchWait0() } - def validate(w: Watchable) = poll(w) == signature(w) - def poll(w: Watchable) = w match { + /** @return true if the watchable did not change. */ + inline def validate(w: Watchable.Pollable): Boolean = validateAnyWatchable(w) + + /** + * As [[validate]] but accepts any [[Watchable]] for the cases when we do not want to use a notification system. + * + * Normally you should use [[validate]] so that types would guide your implementation. + */ + def validateAnyWatchable(w: Watchable): Boolean = poll(w) == signature(w) + + def poll(w: Watchable): Long = w match { case Watchable.Path(p, quick, sig) => new PathRef(os.Path(p), quick, sig, PathRef.Revalidate.Once).recomputeSig() case Watchable.Value(f, sig, pretty) => f() } - def signature(w: Watchable) = w match { + + def signature(w: Watchable): Long = w match { case Watchable.Path(p, quick, sig) => new PathRef(os.Path(p), quick, sig, PathRef.Revalidate.Once).sig case Watchable.Value(f, sig, pretty) => sig