-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWatchServiceBackedObservable.scala
More file actions
166 lines (157 loc) · 6 KB
/
WatchServiceBackedObservable.scala
File metadata and controls
166 lines (157 loc) · 6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/*
* sbt IO
* Copyright Scala Center, Lightbend, and Mark Harrah
*
* Licensed under Apache License 2.0
* SPDX-License-Identifier: Apache-2.0
*
* See the NOTICE file distributed with this work for
* additional information regarding copyright ownership.
*/
package sbt.internal.nio
import java.io.IOException
import java.nio.file.StandardWatchEventKinds.OVERFLOW
import java.nio.file.{ Path, WatchKey }
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import java.util.concurrent.{ CountDownLatch, TimeUnit }
import sbt.internal.io._
import sbt.internal.nio.FileEvent.{ Creation, Deletion }
import sbt.nio.file.FileAttributes.NonExistent
import sbt.nio.file.Glob._
import sbt.nio.file.{ AnyPath, FileAttributes, FileTreeView, Glob, RecursiveGlob }
import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.concurrent.duration._
import scala.util.control.NonFatal
private[sbt] object WatchServiceBackedObservable {
private val eventThreadId = new AtomicInteger(0)
}
private[sbt] class WatchServiceBackedObservable(
s: NewWatchState,
delay: FiniteDuration,
closeService: Boolean,
logger: WatchLogger
) extends Registerable[FileEvent[FileAttributes]]
with Observable[FileEvent[FileAttributes]] {
import WatchServiceBackedObservable.eventThreadId
private type Event = FileEvent[FileAttributes]
private val closed = new AtomicBoolean(false)
private val observers = new Observers[FileEvent[FileAttributes]]
private val fileCache = new FileCache(p => FileAttributes(p).getOrElse(NonExistent))
private val view: FileTreeView.Nio[FileAttributes] = FileTreeView.default
private val thread: Thread = {
val latch = new CountDownLatch(1)
new Thread(s"watch-state-event-thread-${eventThreadId.incrementAndGet()}") {
setDaemon(true)
start()
if (!latch.await(5, TimeUnit.SECONDS))
throw new IllegalStateException("Couldn't start event thread")
@tailrec
final def loopImpl(): Unit = {
try {
if (!closed.get) getFilesForKey(s.service.poll(delay)).foreach { event =>
observers.onNext(event)
}
} catch {
case NonFatal(e) =>
logger.debug(
s"Error getting files from ${s.service}: $e\n${e.getStackTrace mkString "\n"}"
)
}
if (!closed.get) loopImpl()
}
override def run(): Unit = {
latch.countDown()
try {
loopImpl()
} catch {
case _: InterruptedException =>
}
}
def getFilesForKey(key: WatchKey): Seq[Event] = key match {
case null => Vector.empty
case k =>
val rawEvents = k.synchronized {
val events = k.pollEvents.asScala.toVector
k.reset()
events
}
val keyPath = k.watchable.asInstanceOf[Path]
val allEvents: Seq[Event] = rawEvents.flatMap {
case e if e.kind.equals(OVERFLOW) =>
fileCache.refresh(Glob(keyPath, RecursiveGlob))
case e if !e.kind.equals(OVERFLOW) && e.context != null =>
val path = keyPath.resolve(e.context.asInstanceOf[Path])
FileAttributes(path) match {
case Right(attrs) => fileCache.update(path, attrs)
case _ => Nil
}
case _ => Nil: Seq[Event]
}
logger.debug(s"Received events:\n${allEvents.mkString("\n")}")
def process(event: Event): Seq[Event] = {
(event match {
case Creation(path, attrs) if attrs.isDirectory =>
s.register(path)
event +: view.list(Glob(path, RecursiveGlob)).flatMap { case (p, a) =>
process(Creation(p, a))
}
case Deletion(p, attrs) if attrs.isDirectory =>
val events = fileCache.refresh(Glob(p, RecursiveGlob))
events.view.filter(_.attributes.isDirectory).foreach(e => s.unregister(e.path))
events
case e => e :: Nil
}).map {
case d @ Deletion(path, attributes) =>
val newAttrs = FileAttributes(
isDirectory = attributes.isDirectory,
isOther = false,
isRegularFile = attributes.isRegularFile,
isSymbolicLink = attributes.isSymbolicLink
)
Deletion(path, newAttrs, d.occurredAt)
case e => e
}
}
allEvents.flatMap(process)
}
}
}
override def addObserver(observer: Observer[FileEvent[FileAttributes]]): AutoCloseable =
observers.addObserver(observer)
override def close(): Unit = {
if (closed.compareAndSet(false, true)) {
thread.interrupt()
thread.join(5.seconds.toMillis)
if (closeService) s.service.close()
logger.debug("Closed WatchServiceBackedObservable")
}
}
override def register(glob: Glob): Either[IOException, Observable[FileEvent[FileAttributes]]] = {
try {
fileCache.register(glob)
val updatedGlob = glob.range._2 match {
case Int.MaxValue => Glob(glob.base, RecursiveGlob)
case d => (1 to d).foldLeft(Glob(glob.base)) { case (g, _) => g / AnyPath }
}
fileCache.list(updatedGlob) foreach {
case (path, attrs) if attrs.isDirectory => s.register(path)
case _ =>
}
val observable = new Observers[FileEvent[FileAttributes]] {
override def onNext(t: FileEvent[FileAttributes]): Unit = {
if (glob.matches(t.path)) super.onNext(t)
}
}
val handle = observers.addObserver(observable)
Right(new Observable[FileEvent[FileAttributes]] {
override def addObserver(observer: Observer[FileEvent[FileAttributes]]): AutoCloseable =
observable.addObserver(observer)
override def close(): Unit = handle.close()
override def toString = s"Observable($glob)"
})
} catch {
case e: IOException => Left(e)
}
}
}