Skip to content

Commit 5dd2bd3

Browse files
Draft: Implement watching file paths via oslib.watch (for 0.12.x branch) (#5073)
Backport of #5068 The fs watching is *disabled* by default and can be disabled via `--watch-via-fs-notify=true`. --------- Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
1 parent 133a2fc commit 5dd2bd3

File tree

9 files changed

+242
-62
lines changed

9 files changed

+242
-62
lines changed

build.mill

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,9 @@ object Deps {
150150
val junitInterface = ivy"com.github.sbt:junit-interface:0.13.3"
151151
val commonsIo = ivy"commons-io:commons-io:2.18.0"
152152
val log4j2Core = ivy"org.apache.logging.log4j:log4j-core:2.24.3"
153-
val osLib = ivy"com.lihaoyi::os-lib:0.11.4-M6"
153+
val osLibVersion = "0.11.5-M8"
154+
val osLib = ivy"com.lihaoyi::os-lib:${osLibVersion}"
155+
val osLibWatch = ivy"com.lihaoyi::os-lib-watch:${osLibVersion}"
154156
val pprint = ivy"com.lihaoyi::pprint:0.9.0"
155157
val mainargs = ivy"com.lihaoyi::mainargs:0.7.6"
156158
val millModuledefsVersion = "0.11.2"

dist/package.mill

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,12 @@ object `package` extends RootModule with InstallModule {
345345

346346
def mainClass = Some("mill.runner.client.MillClientMain")
347347

348-
def nativeImageClasspath = build.runner.client.runClasspath()
348+
// Use assembly jar as the upstream ivy classpath rather than using runClasspath
349+
// directly to try and avoid native image command length problems on windows
350+
def nativeImageClasspath =
351+
Seq(build.runner.client.resolvedIvyAssembly().pathRef) ++
352+
build.runner.client.upstreamLocalAssemblyClasspath() ++
353+
build.runner.client.localClasspath()
349354

350355
def localBinName = "mill-native"
351356

integration/invalidation/watch-source-input/src/WatchSourceInputTests.scala

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,9 @@ trait WatchTests extends UtestIntegrationTestSuite {
4545
val expectedShows0 = mutable.Buffer.empty[String]
4646
val res = f(expectedOut, expectedErr, expectedShows0)
4747
val (shows, out) = res.out.linesIterator.toVector.partition(_.startsWith("\""))
48-
val err = res.err.linesIterator.toVector
49-
.filter(!_.contains("Compiling compiler interface..."))
50-
.filter(!_.contains("Watching for changes"))
51-
.filter(!_.contains("[info] compiling"))
52-
.filter(!_.contains("[info] done compiling"))
53-
.filter(!_.contains("mill-server/ exitCode file not found"))
48+
val err = res.err.linesIterator.toVector.filter(s =>
49+
s.startsWith("Setting up ") || s.startsWith("Running ")
50+
)
5451

5552
assert(out == expectedOut)
5653

main/util/src/mill/util/Watchable.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,16 @@ import mill.api.internal
1010
*/
1111
@internal
1212
private[mill] trait Watchable {
13+
14+
/** @return the hashcode of a watched value. */
1315
def poll(): Long
16+
17+
/** The initial hashcode of a watched value. */
1418
def signature: Long
19+
20+
/** @return true if the watched value has not changed */
1521
def validate(): Boolean = poll() == signature
22+
1623
def pretty: String
1724
}
1825
@internal

runner/package.mill

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ object `package` extends RootModule with build.MillPublishScalaModule {
1313
build.Deps.windowsAnsi,
1414
build.Deps.coursier,
1515
build.Deps.coursierJvm,
16-
build.Deps.logback
16+
build.Deps.logback,
17+
build.Deps.osLibWatch
1718
)
1819
def buildInfoObjectName = "Versions"
1920
def buildInfoMembers = Seq(

runner/src/mill/runner/MillCliConfig.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,12 @@ case class MillCliConfig(
9595
doc = """Watch and re-run the given tasks when when their inputs change."""
9696
)
9797
watch: Flag = Flag(),
98+
@arg(
99+
name = "notify-watch",
100+
doc =
101+
"Use filesystem based file watching instead of polling based one (experimental, defaults to false)."
102+
)
103+
watchViaFsNotify: Boolean = false,
98104
@arg(
99105
short = 's',
100106
doc =

runner/src/mill/runner/MillMain.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -231,9 +231,13 @@ object MillMain {
231231
}
232232
val (isSuccess, evalStateOpt) = Watching.watchLoop(
233233
ringBell = config.ringBell.value,
234-
watch = config.watch.value,
234+
watch = Option.when(config.watch.value)(Watching.WatchArgs(
235+
setIdle,
236+
colors,
237+
useNotify = config.watchViaFsNotify,
238+
serverDir = serverDir
239+
)),
235240
streams = streams,
236-
setIdle = setIdle,
237241
evaluate = (enterKeyPressed: Boolean, prevState: Option[RunnerState]) => {
238242
adjustJvmProperties(userSpecifiedProperties, initialSystemProperties)
239243

@@ -285,8 +289,7 @@ object MillMain {
285289
}
286290
}
287291
}
288-
},
289-
colors = colors
292+
}
290293
)
291294
bspContext.foreach { ctx =>
292295
repeatForBsp =

runner/src/mill/runner/Watching.scala

Lines changed: 189 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
package mill.runner
22

3-
import mill.api.internal
3+
import mill.api.{SystemStreams, internal}
44
import mill.util.{Colors, Watchable}
5-
import mill.api.SystemStreams
65

76
import java.io.InputStream
7+
import java.nio.channels.ClosedChannelException
88
import scala.annotation.tailrec
9+
import scala.util.Using
910

1011
/**
1112
* Logic around the "watch and wait" functionality in Mill: re-run on change,
@@ -15,72 +16,211 @@ import scala.annotation.tailrec
1516
object Watching {
1617
case class Result[T](watched: Seq[Watchable], error: Option[String], result: T)
1718

19+
trait Evaluate[T] {
20+
def apply(enterKeyPressed: Boolean, previousState: Option[T]): Result[T]
21+
}
22+
23+
/**
24+
* @param useNotify whether to use filesystem based watcher. If it is false uses polling.
25+
* @param serverDir the directory for storing logs of the mill server
26+
*/
27+
case class WatchArgs(
28+
setIdle: Boolean => Unit,
29+
colors: Colors,
30+
useNotify: Boolean,
31+
serverDir: os.Path
32+
)
33+
1834
def watchLoop[T](
1935
ringBell: Boolean,
20-
watch: Boolean,
36+
watch: Option[WatchArgs],
2137
streams: SystemStreams,
22-
setIdle: Boolean => Unit,
23-
evaluate: (Boolean, Option[T]) => Result[T],
24-
colors: Colors
38+
evaluate: Evaluate[T]
2539
): (Boolean, T) = {
26-
var prevState: Option[T] = None
27-
var enterKeyPressed = false
28-
while (true) {
29-
val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed, prevState)
30-
prevState = Some(result)
40+
def handleError(errorOpt: Option[String]): Unit = {
3141
errorOpt.foreach(streams.err.println)
32-
if (ringBell) {
33-
if (errorOpt.isEmpty) println("\u0007")
34-
else {
35-
println("\u0007")
36-
Thread.sleep(250)
37-
println("\u0007")
38-
}
39-
}
42+
doRingBell(hasError = errorOpt.isDefined)
43+
}
4044

41-
if (!watch) {
42-
return (errorOpt.isEmpty, result)
43-
}
45+
def doRingBell(hasError: Boolean): Unit = {
46+
if (!ringBell) return
4447

45-
val alreadyStale = watchables.exists(!_.validate())
46-
enterKeyPressed = false
47-
if (!alreadyStale) {
48-
enterKeyPressed = Watching.watchAndWait(streams, setIdle, streams.in, watchables, colors)
48+
println("\u0007")
49+
if (hasError) {
50+
// If we have an error ring the bell again
51+
Thread.sleep(250)
52+
println("\u0007")
4953
}
5054
}
51-
???
55+
56+
watch match {
57+
case None =>
58+
val Result(watchables, errorOpt, result) =
59+
evaluate(enterKeyPressed = false, previousState = None)
60+
handleError(errorOpt)
61+
(errorOpt.isEmpty, result)
62+
63+
case Some(watchArgs) =>
64+
var prevState: Option[T] = None
65+
var enterKeyPressed = false
66+
67+
// Exits when the thread gets interruped.
68+
while (true) {
69+
val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed, prevState)
70+
prevState = Some(result)
71+
handleError(errorOpt)
72+
73+
try {
74+
watchArgs.setIdle(true)
75+
enterKeyPressed = watchAndWait(streams, streams.in, watchables, watchArgs)
76+
} finally {
77+
watchArgs.setIdle(false)
78+
}
79+
}
80+
throw new IllegalStateException("unreachable")
81+
}
5282
}
5383

54-
def watchAndWait(
84+
private def watchAndWait(
5585
streams: SystemStreams,
56-
setIdle: Boolean => Unit,
5786
stdin: InputStream,
5887
watched: Seq[Watchable],
59-
colors: Colors
88+
watchArgs: WatchArgs
6089
): Boolean = {
61-
setIdle(true)
62-
val watchedPaths = watched.collect { case p: Watchable.Path => p.p.path }
63-
val watchedValues = watched.size - watchedPaths.size
90+
val (watchedPollables, watchedPathsSeq) = watched.partitionMap {
91+
case w: Watchable.Value => Left(w)
92+
case p: Watchable.Path => Right(p)
93+
}
94+
val watchedPathsSet = watchedPathsSeq.iterator.map(p => p.p.path).toSet
95+
val watchedValueCount = watched.size - watchedPathsSeq.size
6496

65-
val watchedValueStr = if (watchedValues == 0) "" else s" and $watchedValues other values"
97+
val watchedValueStr =
98+
if (watchedValueCount == 0) "" else s" and $watchedValueCount other values"
6699

67-
streams.err.println(
68-
colors.info(
69-
s"Watching for changes to ${watchedPaths.size} paths$watchedValueStr... (Enter to re-run, Ctrl-C to exit)"
100+
streams.err.println {
101+
val viaFsNotify = if (watchArgs.useNotify) " (via fsnotify)" else ""
102+
watchArgs.colors.info(
103+
s"Watching for changes to ${watchedPathsSeq.size} paths$viaFsNotify$watchedValueStr... (Enter to re-run, Ctrl-C to exit)"
70104
).toString
71-
)
105+
}
106+
107+
def doWatch(notifiablesChanged: () => Boolean) = {
108+
val enterKeyPressed = statWatchWait(watchedPollables, stdin, notifiablesChanged)
109+
enterKeyPressed
110+
}
111+
112+
def doWatchPolling() =
113+
doWatch(notifiablesChanged = () => watchedPathsSeq.exists(p => !p.validate()))
72114

73-
val enterKeyPressed = statWatchWait(watched, stdin)
74-
setIdle(false)
75-
enterKeyPressed
115+
def doWatchFsNotify() = {
116+
Using.resource(os.write.outputStream(watchArgs.serverDir / "fsNotifyWatchLog")) { watchLog =>
117+
def writeToWatchLog(s: String): Unit = {
118+
try {
119+
watchLog.write(s.getBytes(java.nio.charset.StandardCharsets.UTF_8))
120+
watchLog.write('\n')
121+
} catch {
122+
case _: ClosedChannelException => /* do nothing, the file is already closed */
123+
}
124+
}
125+
126+
@volatile var pathChangesDetected = false
127+
128+
// oslib watch only works with folders, so we have to watch the parent folders instead
129+
130+
writeToWatchLog(
131+
s"[watched-paths:unfiltered] ${watchedPathsSet.toSeq.sorted.mkString("\n")}"
132+
)
133+
134+
val workspaceRoot = mill.api.WorkspaceRoot.workspaceRoot
135+
136+
/** Paths that are descendants of [[workspaceRoot]]. */
137+
val pathsUnderWorkspaceRoot = watchedPathsSet.filter { path =>
138+
val isUnderWorkspaceRoot = path.startsWith(workspaceRoot)
139+
if (!isUnderWorkspaceRoot) {
140+
streams.err.println(watchArgs.colors.error(
141+
s"Watched path $path is outside workspace root $workspaceRoot, this is unsupported."
142+
).toString())
143+
}
144+
145+
isUnderWorkspaceRoot
146+
}
147+
148+
// If I have 'root/a/b/c'
149+
//
150+
// Then I want to watch:
151+
// root/a/b/c
152+
// root/a/b
153+
// root/a
154+
// root
155+
//
156+
// We're only setting one `os.watch.watch` on the root, and this makes it sound like
157+
// we're setting multiple. What we're actually doing is choosing the paths we need to watch recursively in
158+
// Linux since inotify is non-recursive by default, since changes in any enclosing folder could result in the
159+
// watched file or folder disappearing (e.g. if the enclosing folder was renamed) and we want to pick up such
160+
// changes.
161+
val filterPaths = pathsUnderWorkspaceRoot.flatMap { path =>
162+
path.relativeTo(workspaceRoot).segments.inits.map(segments => workspaceRoot / segments)
163+
}
164+
writeToWatchLog(s"[watched-paths:filtered] ${filterPaths.toSeq.sorted.mkString("\n")}")
165+
166+
Using.resource(os.watch.watch(
167+
// Just watch the root folder
168+
Seq(workspaceRoot),
169+
filter = path => {
170+
val shouldBeWatched =
171+
filterPaths.contains(path) || watchedPathsSet.exists(watchedPath =>
172+
path.startsWith(watchedPath)
173+
)
174+
writeToWatchLog(s"[filter] (shouldBeWatched=$shouldBeWatched) $path")
175+
shouldBeWatched
176+
},
177+
onEvent = changedPaths => {
178+
// Make sure that the changed paths are actually the ones in our watch list and not some adjacent files in the
179+
// same folder
180+
val hasWatchedPath =
181+
changedPaths.exists(p =>
182+
watchedPathsSet.exists(watchedPath => p.startsWith(watchedPath))
183+
)
184+
writeToWatchLog(
185+
s"[changed-paths] (hasWatchedPath=$hasWatchedPath) ${changedPaths.mkString("\n")}"
186+
)
187+
if (hasWatchedPath) {
188+
pathChangesDetected = true
189+
}
190+
},
191+
logger = (eventType, data) =>
192+
writeToWatchLog(s"[watch:event] $eventType: ${pprint.apply(data).plainText}")
193+
)) { _ =>
194+
// If already stale, re-evaluate instantly.
195+
//
196+
// We need to do this to prevent any changes from slipping through the gap between the last evaluation and
197+
// starting the watch.
198+
val alreadyStale = watched.exists(w => !w.validate())
199+
200+
if (alreadyStale) false
201+
else doWatch(notifiablesChanged = () => pathChangesDetected)
202+
}
203+
}
204+
}
205+
206+
if (watchArgs.useNotify) doWatchFsNotify()
207+
else doWatchPolling()
76208
}
77209

78-
// Returns `true` if enter key is pressed to re-run tasks explicitly
79-
def statWatchWait(watched: Seq[Watchable], stdin: InputStream): Boolean = {
210+
/**
211+
* @param notifiablesChanged returns true if any of the notifiables have changed
212+
*
213+
* @return `true` if enter key is pressed to re-run tasks explicitly, false if changes in watched files occured.
214+
*/
215+
def statWatchWait(
216+
watched: Seq[Watchable],
217+
stdin: InputStream,
218+
notifiablesChanged: () => Boolean
219+
): Boolean = {
80220
val buffer = new Array[Byte](4 * 1024)
81221

82222
@tailrec def statWatchWait0(): Boolean = {
83-
if (watched.forall(_.validate())) {
223+
if (!notifiablesChanged() && watched.forall(_.validate())) {
84224
if (lookForEnterKey()) {
85225
true
86226
} else {
@@ -94,17 +234,18 @@ object Watching {
94234
if (stdin.available() == 0) false
95235
else stdin.read(buffer) match {
96236
case 0 | -1 => false
97-
case n =>
237+
case bytesRead =>
98238
buffer.indexOf('\n') match {
99239
case -1 => lookForEnterKey()
100-
case i =>
101-
if (i >= n) lookForEnterKey()
240+
case index =>
241+
// If we found the newline further than the bytes read, that means it's not from this read and thus we
242+
// should try reading again.
243+
if (index >= bytesRead) lookForEnterKey()
102244
else true
103245
}
104246
}
105247
}
106248

107249
statWatchWait0()
108250
}
109-
110251
}

0 commit comments

Comments
 (0)