Skip to content

Commit 73b6b6f

Browse files
committed
refactor so that watch starts before evaluate loop
1 parent dfbba1f commit 73b6b6f

File tree

2 files changed

+134
-110
lines changed

2 files changed

+134
-110
lines changed

integration/invalidation/watch-source-input/src/WatchSourceInputTests.scala

+1-6
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ object WatchSourceTests extends WatchTests {
8383
"Running qux foo contents initial-foo1 initial-foo2 Running qux bar contents initial-bar"
8484
)
8585

86-
Thread.sleep(1000) // Wait for the watching to begin
8786
os.write.over(workspacePath / "foo1.txt", "edited-foo1")
8887
awaitCompletionMarker(tester, "quxRan1")
8988
expectedErr.append(
@@ -93,8 +92,7 @@ object WatchSourceTests extends WatchTests {
9392
expectedShows.append(
9493
"Running qux foo contents edited-foo1 initial-foo2 Running qux bar contents initial-bar"
9594
)
96-
97-
Thread.sleep(1000) // Wait for the watching to begin
95+
9896
os.write.over(workspacePath / "foo2.txt", "edited-foo2")
9997
awaitCompletionMarker(tester, "quxRan2")
10098
expectedErr.append(
@@ -105,7 +103,6 @@ object WatchSourceTests extends WatchTests {
105103
"Running qux foo contents edited-foo1 edited-foo2 Running qux bar contents initial-bar"
106104
)
107105

108-
Thread.sleep(1000) // Wait for the watching to begin
109106
os.write.over(workspacePath / "bar.txt", "edited-bar")
110107
awaitCompletionMarker(tester, "quxRan3")
111108
expectedErr.append(
@@ -116,7 +113,6 @@ object WatchSourceTests extends WatchTests {
116113
"Running qux foo contents edited-foo1 edited-foo2 Running qux bar contents edited-bar"
117114
)
118115

119-
Thread.sleep(1000) // Wait for the watching to begin
120116
os.write.append(workspacePath / "build.mill", "\ndef unrelated = true")
121117
awaitCompletionMarker(tester, "initialized1")
122118
expectedOut.append(
@@ -128,7 +124,6 @@ object WatchSourceTests extends WatchTests {
128124
)
129125

130126
if (show) expectedOut.append("{}")
131-
Thread.sleep(1000) // Wait for the watching to begin
132127
os.write.over(workspacePath / "watchValue.txt", "exit")
133128
awaitCompletionMarker(tester, "initialized2")
134129
expectedOut.append("Setting up build.mill")

runner/daemon/src/mill/daemon/Watching.scala

+133-104
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ package mill.daemon
33
import mill.api.SystemStreams
44
import mill.api.internal.internal
55
import mill.define.internal.Watchable
6-
import mill.define.{PathRef, WorkspaceRoot}
6+
import mill.define.PathRef
7+
import mill.define.WorkspaceRoot.workspaceRoot
78
import mill.internal.Colors
89

910
import java.io.InputStream
@@ -34,6 +35,12 @@ object Watching {
3435
serverDir: os.Path
3536
)
3637

38+
private case class WatchViaNotifyArgs(
39+
notifiablesChanged: () => Boolean
40+
)
41+
42+
class MutableCell[A](var value: A)
43+
3744
/**
3845
* @param ringBell whether to emit bells
3946
* @param watch if [[None]] just runs once and returns
@@ -71,40 +78,141 @@ object Watching {
7178
var prevState: Option[T] = None
7279
var enterKeyPressed = false
7380

74-
// Exits when the thread gets interruped.
75-
while (true) {
81+
def evaluateOnce() = {
7682
val Result(watchables, errorOpt, result) = evaluate(enterKeyPressed, prevState)
7783
prevState = Some(result)
7884
handleError(errorOpt)
85+
WatchedFiles(watchables, streams, watchArgs.colors)
86+
}
7987

80-
// Do not enter watch if already stale, re-evaluate instantly.
81-
val alreadyStale = watchables.exists(w => !validateAnyWatchable(w))
82-
if (alreadyStale) {
83-
enterKeyPressed = false
84-
} else {
85-
enterKeyPressed = watchAndWait(streams, streams.in, watchables, watchArgs)
88+
def loop(watchViaNotify: Option[WatchViaNotifyArgs]): Nothing = {
89+
// Exits when the thread gets interruped.
90+
while (true) {
91+
val watchables = evaluateOnce()
92+
93+
// Do not enter watch if already stale, re-evaluate instantly.
94+
val alreadyStale = watchables.watched.exists(w => !validateAnyWatchable(w))
95+
if (alreadyStale) {
96+
enterKeyPressed = false
97+
} else {
98+
enterKeyPressed = watchAndWait(streams, streams.in, watchables, watchArgs, watchViaNotify)
99+
}
86100
}
101+
throw new IllegalStateException("unreachable")
87102
}
88-
throw new IllegalStateException("unreachable")
103+
104+
if (watchArgs.useNotify) {
105+
Using.resource(os.write.outputStream(watchArgs.serverDir / "fsNotifyWatchLog")) { watchLog =>
106+
def writeToWatchLog(s: String): Unit = {
107+
try {
108+
watchLog.write(s.getBytes(java.nio.charset.StandardCharsets.UTF_8))
109+
watchLog.write('\n')
110+
} catch {
111+
case _: ClosedChannelException => /* do nothing, the file is already closed */
112+
}
113+
}
114+
115+
val watchedFiles = evaluateOnce()
116+
writeToWatchLog(s"[watched-paths:unfiltered] ${watchedFiles.watchedPathsSet.toSeq.sorted.mkString("\n")}")
117+
writeToWatchLog(s"[watched-paths:filtered] ${watchedFiles.filterPaths.toSeq.sorted.mkString("\n")}")
118+
119+
// Start the watch before entering the evaluation loop to make sure no events fall through.
120+
@volatile var pathChangesDetected = false
121+
Using.resource(os.watch.watch(
122+
// Just watch the root folder
123+
Seq(workspaceRoot),
124+
filter = path => {
125+
val shouldBeWatched =
126+
watchedFiles.filterPaths.contains(path) || watchedFiles.watchedPathsSet.exists(watchedPath =>
127+
path.startsWith(watchedPath)
128+
)
129+
writeToWatchLog(s"[filter] (shouldBeWatched=$shouldBeWatched) $path")
130+
shouldBeWatched
131+
},
132+
onEvent = changedPaths => {
133+
// Make sure that the changed paths are actually the ones in our watch list and not some adjacent files in the
134+
// same folder
135+
val hasWatchedPath =
136+
changedPaths.exists(p =>
137+
watchedFiles.watchedPathsSet.exists(watchedPath => p.startsWith(watchedPath))
138+
)
139+
writeToWatchLog(
140+
s"[changed-paths] (hasWatchedPath=$hasWatchedPath) ${changedPaths.mkString("\n")}"
141+
)
142+
if (hasWatchedPath) {
143+
pathChangesDetected = true
144+
}
145+
},
146+
logger = (eventType, data) =>
147+
writeToWatchLog(s"[watch:event] $eventType: ${pprint.apply(data).plainText}")
148+
)) { _ =>
149+
loop(Some(WatchViaNotifyArgs(notifiablesChanged = () => pathChangesDetected)))
150+
}
151+
}
152+
} else loop(watchViaNotify = None)
89153
}
90154
}
91155

92-
def watchAndWait(
156+
private case class WatchedFiles(
157+
watched: Seq[Watchable],
158+
watchedPollables: Seq[Watchable.Pollable],
159+
watchedPathsSeq: Seq[Watchable.Path],
160+
watchedPathsSet: Set[os.Path],
161+
filterPaths: Set[os.Path]
162+
) {
163+
def watchedValueCount: Int = watched.size - watchedPathsSeq.size
164+
165+
def watchedValueStr: String =
166+
if (watchedValueCount == 0) "" else s" and $watchedValueCount other values"
167+
}
168+
private object WatchedFiles {
169+
def apply(watched: Seq[Watchable], streams: SystemStreams, colors: Colors): WatchedFiles = {
170+
val (watchedPollables, watchedPathsSeq) = watched.partitionMap {
171+
case w: Watchable.Pollable => Left(w)
172+
case p: Watchable.Path => Right(p)
173+
}
174+
175+
val watchedPathsSet: Set[os.Path] = watchedPathsSeq.iterator.map(p => os.Path(p.p)).toSet
176+
177+
/** Paths that are descendants of [[workspaceRoot]]. */
178+
val pathsUnderWorkspaceRoot = {
179+
watchedPathsSet.filter { path =>
180+
val isUnderWorkspaceRoot = path.startsWith(workspaceRoot)
181+
if (!isUnderWorkspaceRoot) streams.err.println(colors.error(
182+
s"Watched path $path is outside workspace root $workspaceRoot, this is unsupported."
183+
).toString())
184+
185+
isUnderWorkspaceRoot
186+
}
187+
}
188+
189+
// If I have 'root/a/b/c'
190+
//
191+
// Then I want to watch:
192+
// root/a/b/c
193+
// root/a/b
194+
// root/a
195+
// root
196+
val filterPaths = pathsUnderWorkspaceRoot.flatMap { path =>
197+
path.relativeTo(workspaceRoot).segments.inits.map(segments => workspaceRoot / segments)
198+
}
199+
200+
apply(
201+
watched = watched, watchedPollables = watchedPollables, watchedPathsSeq = watchedPathsSeq,
202+
watchedPathsSet = watchedPathsSet, filterPaths = filterPaths
203+
)
204+
}
205+
}
206+
207+
private def watchAndWait(
93208
streams: SystemStreams,
94209
stdin: InputStream,
95-
watched: Seq[Watchable],
96-
watchArgs: WatchArgs
210+
watched: WatchedFiles,
211+
watchArgs: WatchArgs,
212+
watchViaNotifyArgs: Option[WatchViaNotifyArgs]
97213
): Boolean = {
98214
watchArgs.setIdle(true)
99-
val (watchedPollables, watchedPathsSeq) = watched.partitionMap {
100-
case w: Watchable.Pollable => Left(w)
101-
case p: Watchable.Path => Right(p)
102-
}
103-
val watchedPathsSet = watchedPathsSeq.iterator.map(p => os.Path(p.p)).toSet
104-
val watchedValueCount = watched.size - watchedPathsSeq.size
105-
106-
val watchedValueStr =
107-
if (watchedValueCount == 0) "" else s" and $watchedValueCount other values"
215+
import watched.{watchedValueStr, watchedPollables, watchedPathsSeq}
108216

109217
streams.err.println {
110218
val viaFsNotify = if (watchArgs.useNotify) " (via fsnotify)" else ""
@@ -119,89 +227,10 @@ object Watching {
119227
enterKeyPressed
120228
}
121229

122-
def doWatchPolling() =
123-
doWatch(notifiablesChanged = () => watchedPathsSeq.exists(p => !validateAnyWatchable(p)))
124-
125-
def doWatchFsNotify() = {
126-
Using.resource(os.write.outputStream(watchArgs.serverDir / "fsNotifyWatchLog")) { watchLog =>
127-
def writeToWatchLog(s: String): Unit = {
128-
try {
129-
watchLog.write(s.getBytes(java.nio.charset.StandardCharsets.UTF_8))
130-
watchLog.write('\n')
131-
} catch {
132-
case _: ClosedChannelException => /* do nothing, the file is already closed */
133-
}
134-
}
135-
136-
@volatile var pathChangesDetected = false
137-
138-
// oslib watch only works with folders, so we have to watch the parent folders instead
139-
140-
writeToWatchLog(
141-
s"[watched-paths:unfiltered] ${watchedPathsSet.toSeq.sorted.mkString("\n")}"
142-
)
143-
144-
val workspaceRoot = WorkspaceRoot.workspaceRoot
145-
146-
/** Paths that are descendants of [[workspaceRoot]]. */
147-
val pathsUnderWorkspaceRoot = watchedPathsSet.filter { path =>
148-
val isUnderWorkspaceRoot = path.startsWith(workspaceRoot)
149-
if (!isUnderWorkspaceRoot) {
150-
streams.err.println(watchArgs.colors.error(
151-
s"Watched path $path is outside workspace root $workspaceRoot, this is unsupported."
152-
).toString())
153-
}
154-
155-
isUnderWorkspaceRoot
156-
}
157-
158-
// If I have 'root/a/b/c'
159-
//
160-
// Then I want to watch:
161-
// root/a/b/c
162-
// root/a/b
163-
// root/a
164-
// root
165-
val filterPaths = pathsUnderWorkspaceRoot.flatMap { path =>
166-
path.relativeTo(workspaceRoot).segments.inits.map(segments => workspaceRoot / segments)
167-
}
168-
writeToWatchLog(s"[watched-paths:filtered] ${filterPaths.toSeq.sorted.mkString("\n")}")
169-
170-
Using.resource(os.watch.watch(
171-
// Just watch the root folder
172-
Seq(workspaceRoot),
173-
filter = path => {
174-
val shouldBeWatched =
175-
filterPaths.contains(path) || watchedPathsSet.exists(watchedPath =>
176-
path.startsWith(watchedPath)
177-
)
178-
writeToWatchLog(s"[filter] (shouldBeWatched=$shouldBeWatched) $path")
179-
shouldBeWatched
180-
},
181-
onEvent = changedPaths => {
182-
// Make sure that the changed paths are actually the ones in our watch list and not some adjacent files in the
183-
// same folder
184-
val hasWatchedPath =
185-
changedPaths.exists(p =>
186-
watchedPathsSet.exists(watchedPath => p.startsWith(watchedPath))
187-
)
188-
writeToWatchLog(
189-
s"[changed-paths] (hasWatchedPath=$hasWatchedPath) ${changedPaths.mkString("\n")}"
190-
)
191-
if (hasWatchedPath) {
192-
pathChangesDetected = true
193-
}
194-
},
195-
logger = (eventType, data) =>
196-
writeToWatchLog(s"[watch:event] $eventType: ${pprint.apply(data).plainText}")
197-
)) { _ =>
198-
doWatch(notifiablesChanged = () => pathChangesDetected)
199-
}
200-
}
230+
watchViaNotifyArgs match {
231+
case Some(notifyArgs) => doWatch(notifyArgs.notifiablesChanged)
232+
case None => doWatch(notifiablesChanged = () => watchedPathsSeq.exists(p => !validateAnyWatchable(p)))
201233
}
202-
203-
if (watchArgs.useNotify) doWatchFsNotify()
204-
else doWatchPolling()
205234
}
206235

207236
/**

0 commit comments

Comments
 (0)