Skip to content

Draft: Implement watching file paths via oslib.watch (for 0.12.x branch) #5073

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: 0.12.x
Choose a base branch
from
Draft
4 changes: 3 additions & 1 deletion build.mill
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,9 @@ object Deps {
val junitInterface = ivy"com.github.sbt:junit-interface:0.13.3"
val commonsIo = ivy"commons-io:commons-io:2.18.0"
val log4j2Core = ivy"org.apache.logging.log4j:log4j-core:2.24.3"
val osLib = ivy"com.lihaoyi::os-lib:0.11.4-M6"
val osLibVersion = "0.11.5-M8"
val osLib = ivy"com.lihaoyi::os-lib:${osLibVersion}"
val osLibWatch = ivy"com.lihaoyi::os-lib-watch:${osLibVersion}"
val pprint = ivy"com.lihaoyi::pprint:0.9.0"
val mainargs = ivy"com.lihaoyi::mainargs:0.7.6"
val millModuledefsVersion = "0.11.2"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@ trait WatchTests extends UtestIntegrationTestSuite {
val expectedShows0 = mutable.Buffer.empty[String]
val res = f(expectedOut, expectedErr, expectedShows0)
val (shows, out) = res.out.linesIterator.toVector.partition(_.startsWith("\""))
val err = res.err.linesIterator.toVector
.filter(!_.contains("Compiling compiler interface..."))
.filter(!_.contains("Watching for changes"))
.filter(!_.contains("[info] compiling"))
.filter(!_.contains("[info] done compiling"))
.filter(!_.contains("mill-server/ exitCode file not found"))
val err = res.err.linesIterator.toVector.filter(s => s.startsWith("Setting up ") || s.startsWith("Running "))

assert(out == expectedOut)

Expand Down
7 changes: 7 additions & 0 deletions main/util/src/mill/util/Watchable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,16 @@ import mill.api.internal
*/
@internal
private[mill] trait Watchable {

/** @return the hashcode of a watched value. */
def poll(): Long

/** The initial hashcode of a watched value. */
def signature: Long

/** @return true if the watched value has not changed */
def validate(): Boolean = poll() == signature

def pretty: String
}
@internal
Expand Down
3 changes: 2 additions & 1 deletion runner/package.mill
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ object `package` extends RootModule with build.MillPublishScalaModule {
build.Deps.windowsAnsi,
build.Deps.coursier,
build.Deps.coursierJvm,
build.Deps.logback
build.Deps.logback,
build.Deps.osLibWatch
)
def buildInfoObjectName = "Versions"
def buildInfoMembers = Seq(
Expand Down
6 changes: 6 additions & 0 deletions runner/src/mill/runner/MillCliConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ case class MillCliConfig(
doc = """Watch and re-run the given tasks when when their inputs change."""
)
watch: Flag = Flag(),
@arg(
name = "watch-via-fs-notify",
doc =
"Use filesystem based file watching instead of polling based one (experimental, defaults to false)."
)
watchViaFsNotify: Boolean = false,
@arg(
short = 's',
doc =
Expand Down
11 changes: 7 additions & 4 deletions runner/src/mill/runner/MillMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,13 @@ object MillMain {
}
val (isSuccess, evalStateOpt) = Watching.watchLoop(
ringBell = config.ringBell.value,
watch = config.watch.value,
watch = Option.when(config.watch.value)(Watching.WatchArgs(
setIdle,
colors,
useNotify = config.watchViaFsNotify,
serverDir = serverDir
)),
streams = streams,
setIdle = setIdle,
evaluate = (enterKeyPressed: Boolean, prevState: Option[RunnerState]) => {
adjustJvmProperties(userSpecifiedProperties, initialSystemProperties)

Expand Down Expand Up @@ -285,8 +289,7 @@ object MillMain {
}
}
}
},
colors = colors
}
)
bspContext.foreach { ctx =>
repeatForBsp =
Expand Down
231 changes: 183 additions & 48 deletions runner/src/mill/runner/Watching.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package mill.runner

import mill.api.internal
import mill.api.{SystemStreams, internal}
import mill.util.{Colors, Watchable}
import mill.api.SystemStreams

import java.io.InputStream
import java.nio.channels.ClosedChannelException
import scala.annotation.tailrec
import scala.util.Using

/**
* Logic around the "watch and wait" functionality in Mill: re-run on change,
Expand All @@ -15,72 +16,205 @@ import scala.annotation.tailrec
object Watching {
case class Result[T](watched: Seq[Watchable], error: Option[String], result: T)

trait Evaluate[T] {
def apply(enterKeyPressed: Boolean, previousState: Option[T]): Result[T]
}

/**
* @param useNotify whether to use filesystem based watcher. If it is false uses polling.
* @param serverDir the directory for storing logs of the mill server
*/
case class WatchArgs(
setIdle: Boolean => Unit,
colors: Colors,
useNotify: Boolean,
serverDir: os.Path
)

def watchLoop[T](
ringBell: Boolean,
watch: Boolean,
watch: Option[WatchArgs],
streams: SystemStreams,
setIdle: Boolean => Unit,
evaluate: (Boolean, Option[T]) => Result[T],
colors: Colors
evaluate: Evaluate[T]
): (Boolean, T) = {
var prevState: Option[T] = None
var enterKeyPressed = false
while (true) {
val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed, prevState)
prevState = Some(result)
def handleError(errorOpt: Option[String]): Unit = {
errorOpt.foreach(streams.err.println)
if (ringBell) {
if (errorOpt.isEmpty) println("\u0007")
else {
println("\u0007")
Thread.sleep(250)
println("\u0007")
}
}
doRingBell(hasError = errorOpt.isDefined)
}

if (!watch) {
return (errorOpt.isEmpty, result)
}
def doRingBell(hasError: Boolean): Unit = {
if (!ringBell) return

val alreadyStale = watchables.exists(!_.validate())
enterKeyPressed = false
if (!alreadyStale) {
enterKeyPressed = Watching.watchAndWait(streams, setIdle, streams.in, watchables, colors)
println("\u0007")
if (hasError) {
// If we have an error ring the bell again
Thread.sleep(250)
println("\u0007")
}
}
???

watch match {
case None =>
val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed = false, previousState = None)
handleError(errorOpt)
(errorOpt.isEmpty, result)

case Some(watchArgs) =>
var prevState: Option[T] = None
var enterKeyPressed = false

// Exits when the thread gets interruped.
while (true) {
val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed, prevState)
prevState = Some(result)
handleError(errorOpt)

try {
watchArgs.setIdle(true)
enterKeyPressed = watchAndWait(streams, streams.in, watchables, watchArgs)
}
finally {
watchArgs.setIdle(false)
}
}
throw new IllegalStateException("unreachable")
}
}

def watchAndWait(
private def watchAndWait(
streams: SystemStreams,
setIdle: Boolean => Unit,
stdin: InputStream,
watched: Seq[Watchable],
colors: Colors
watchArgs: WatchArgs
): Boolean = {
setIdle(true)
val watchedPaths = watched.collect { case p: Watchable.Path => p.p.path }
val watchedValues = watched.size - watchedPaths.size
val (watchedPollables, watchedPathsSeq) = watched.partitionMap {
case w: Watchable.Value => Left(w)
case p: Watchable.Path => Right(p)
}
val watchedPathsSet = watchedPathsSeq.iterator.map(p => p.p.path).toSet
val watchedValueCount = watched.size - watchedPathsSeq.size

val watchedValueStr = if (watchedValues == 0) "" else s" and $watchedValues other values"
val watchedValueStr =
if (watchedValueCount == 0) "" else s" and $watchedValueCount other values"

streams.err.println(
colors.info(
s"Watching for changes to ${watchedPaths.size} paths$watchedValueStr... (Enter to re-run, Ctrl-C to exit)"
streams.err.println {
val viaFsNotify = if (watchArgs.useNotify) " (via fsnotify)" else ""
watchArgs.colors.info(
s"Watching for changes to ${watchedPathsSeq.size} paths$viaFsNotify$watchedValueStr... (Enter to re-run, Ctrl-C to exit)"
).toString
)
}

def doWatch(notifiablesChanged: () => Boolean) = {
val enterKeyPressed = statWatchWait(watchedPollables, stdin, notifiablesChanged)
enterKeyPressed
}

def doWatchPolling() =
doWatch(notifiablesChanged = () => watchedPathsSeq.exists(p => !p.validate()))

val enterKeyPressed = statWatchWait(watched, stdin)
setIdle(false)
enterKeyPressed
def doWatchFsNotify() = {
Using.resource(os.write.outputStream(watchArgs.serverDir / "fsNotifyWatchLog")) { watchLog =>
def writeToWatchLog(s: String): Unit = {
try {
watchLog.write(s.getBytes(java.nio.charset.StandardCharsets.UTF_8))
watchLog.write('\n')
} catch {
case _: ClosedChannelException => /* do nothing, the file is already closed */
}
}

@volatile var pathChangesDetected = false

// oslib watch only works with folders, so we have to watch the parent folders instead

writeToWatchLog(
s"[watched-paths:unfiltered] ${watchedPathsSet.toSeq.sorted.mkString("\n")}"
)

val workspaceRoot = mill.api.WorkspaceRoot.workspaceRoot

/** Paths that are descendants of [[workspaceRoot]]. */
val pathsUnderWorkspaceRoot = watchedPathsSet.filter { path =>
val isUnderWorkspaceRoot = path.startsWith(workspaceRoot)
if (!isUnderWorkspaceRoot) {
streams.err.println(watchArgs.colors.error(
s"Watched path $path is outside workspace root $workspaceRoot, this is unsupported."
).toString())
}

isUnderWorkspaceRoot
}

// If I have 'root/a/b/c'
//
// Then I want to watch:
// root/a/b/c
// root/a/b
// root/a
// root
val filterPaths = pathsUnderWorkspaceRoot.flatMap { path =>
path.relativeTo(workspaceRoot).segments.inits.map(segments => workspaceRoot / segments)
}
writeToWatchLog(s"[watched-paths:filtered] ${filterPaths.toSeq.sorted.mkString("\n")}")

Using.resource(os.watch.watch(
// Just watch the root folder
Seq(workspaceRoot),
filter = path => {
val shouldBeWatched =
filterPaths.contains(path) || watchedPathsSet.exists(watchedPath =>
path.startsWith(watchedPath)
)
writeToWatchLog(s"[filter] (shouldBeWatched=$shouldBeWatched) $path")
shouldBeWatched
},
onEvent = changedPaths => {
// Make sure that the changed paths are actually the ones in our watch list and not some adjacent files in the
// same folder
val hasWatchedPath =
changedPaths.exists(p =>
watchedPathsSet.exists(watchedPath => p.startsWith(watchedPath))
)
writeToWatchLog(
s"[changed-paths] (hasWatchedPath=$hasWatchedPath) ${changedPaths.mkString("\n")}"
)
if (hasWatchedPath) {
pathChangesDetected = true
}
},
logger = (eventType, data) =>
writeToWatchLog(s"[watch:event] $eventType: ${pprint.apply(data).plainText}")
)) { _ =>
// If already stale, re-evaluate instantly.
//
// We need to do this to prevent any changes from slipping through the gap between the last evaluation and
// starting the watch.
val alreadyStale = watched.exists(w => !w.validate())

if (alreadyStale) false
else doWatch(notifiablesChanged = () => pathChangesDetected)
}
}
}

if (watchArgs.useNotify) doWatchFsNotify()
else doWatchPolling()
}

// Returns `true` if enter key is pressed to re-run tasks explicitly
def statWatchWait(watched: Seq[Watchable], stdin: InputStream): Boolean = {
/**
* @param notifiablesChanged returns true if any of the notifiables have changed
*
* @return `true` if enter key is pressed to re-run tasks explicitly, false if changes in watched files occured.
*/
def statWatchWait(
watched: Seq[Watchable],
stdin: InputStream,
notifiablesChanged: () => Boolean
): Boolean = {
val buffer = new Array[Byte](4 * 1024)

@tailrec def statWatchWait0(): Boolean = {
if (watched.forall(_.validate())) {
if (!notifiablesChanged() && watched.forall(_.validate())) {
if (lookForEnterKey()) {
true
} else {
Expand All @@ -94,17 +228,18 @@ object Watching {
if (stdin.available() == 0) false
else stdin.read(buffer) match {
case 0 | -1 => false
case n =>
case bytesRead =>
buffer.indexOf('\n') match {
case -1 => lookForEnterKey()
case i =>
if (i >= n) lookForEnterKey()
case index =>
// If we found the newline further than the bytes read, that means it's not from this read and thus we
// should try reading again.
if (index >= bytesRead) lookForEnterKey()
else true
}
}
}

statWatchWait0()
}

}
Loading
Loading