Skip to content

Commit 52f372b

Browse files
Watch dir threads (#155)
* New (optional for now) directory watch service This uses one watch service for an entire file system (which means 1 thread) rather than creating a thread per dataset.
1 parent 8fe4963 commit 52f372b

File tree

7 files changed

+771
-394
lines changed

7 files changed

+771
-394
lines changed
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
package gov.noaa.pfel.coastwatch.util;
2+
3+
import java.io.IOException;
4+
import java.nio.file.FileStore;
5+
import java.nio.file.FileSystem;
6+
import java.nio.file.Path;
7+
import java.nio.file.Paths;
8+
import java.nio.file.StandardWatchEventKinds;
9+
import java.nio.file.WatchEvent;
10+
import java.nio.file.WatchKey;
11+
import java.nio.file.WatchService;
12+
import java.util.HashMap;
13+
import java.util.HashSet;
14+
import java.util.concurrent.ConcurrentHashMap;
15+
16+
import com.cohort.array.StringArray;
17+
import com.cohort.util.File2;
18+
import com.cohort.util.String2;
19+
20+
public class SharedWatchService {
21+
public final static WatchEvent.Kind<Path> CREATE = StandardWatchEventKinds.ENTRY_CREATE;
22+
public final static WatchEvent.Kind<Path> DELETE = StandardWatchEventKinds.ENTRY_DELETE;
23+
public final static WatchEvent.Kind<Path> MODIFY = StandardWatchEventKinds.ENTRY_MODIFY;
24+
public final static WatchEvent.Kind<Object> OVERFLOW = StandardWatchEventKinds.OVERFLOW;
25+
26+
private static ConcurrentHashMap<WatchKey, String> keyToDirMap = new ConcurrentHashMap<>();
27+
private static ConcurrentHashMap<String, WatchService> fileSystemToService = new ConcurrentHashMap<>();
28+
private static ConcurrentHashMap<WatchKey, String> keyToHandlerId = new ConcurrentHashMap<>();
29+
private static ConcurrentHashMap<String, WatchUpdateHandler> handlerIdToHandler = new ConcurrentHashMap<>();
30+
31+
/**
32+
* Registers a directory with the shared watch service.
33+
*
34+
* @param tWatchDir the starting directory, with \\ or /, with or without
35+
* trailing slash.
36+
* The results will contain dirs with matching slashes.
37+
* @param tRecursive
38+
* @param tPathRegex null and "" are treated like .* (which matches all)
39+
* @param handler the WatchUpdateHandler that should be called for matching
40+
* events.
41+
* @param handlerId the identifier of the dataset
42+
* @throws various Exceptions if trouble
43+
*/
44+
public static void watchDirectory(String tWatchDir, boolean tRecursive, String tPathRegex,
45+
WatchUpdateHandler handler, String handlerId) throws IOException {
46+
handlerIdToHandler.put(handlerId, handler);
47+
48+
String watchDir = File2.addSlash(tWatchDir);
49+
50+
Path watchPath = Paths.get(watchDir);
51+
if (watchPath == null)
52+
throw new RuntimeException("Directory not found: " + watchDir);
53+
54+
// make the WatchService
55+
FileSystem fs = watchPath.getFileSystem();
56+
if (fs == null)
57+
throw new RuntimeException(
58+
"getFileSystem returned null for the " + watchDir + " path.");
59+
60+
String fsId = SharedWatchService.systemToID(fs);
61+
62+
WatchService watchService = null;
63+
if (fileSystemToService.containsKey(fsId)) {
64+
watchService = fileSystemToService.get(fsId);
65+
}
66+
if (watchService == null) {
67+
watchService = fs.newWatchService();
68+
fileSystemToService.put(fsId, watchService);
69+
}
70+
if (watchService == null)
71+
throw new RuntimeException(
72+
"The OS doesn't support WatchService for that file system.");
73+
if (tRecursive) {
74+
StringArray alps = FileVisitorSubdir.oneStep( // throws IOException if "Too many open files"
75+
watchDir, tPathRegex); // will have matching slashes and trailing slashes
76+
int n = alps.size();
77+
for (int i = 0; i < n; i++) {
78+
register(watchService, alps.get(i), handlerId);
79+
}
80+
} else {
81+
register(watchService, String2.canonical(watchDir), handlerId);
82+
}
83+
}
84+
85+
private static void register(WatchService watchService, String directory, String handler) throws IOException {
86+
WatchKey key = Paths.get(directory).register(watchService,
87+
new WatchEvent.Kind[] { CREATE, DELETE, MODIFY });
88+
keyToDirMap.put(key, directory);
89+
keyToHandlerId.put(key, handler);
90+
}
91+
92+
private static String systemToID(FileSystem system) {
93+
String id = "";
94+
for (FileStore store : system.getFileStores()) {
95+
id += store.name() + store.type();
96+
}
97+
return id;
98+
}
99+
100+
/**
101+
* Process all events for keys queued to the watcher
102+
*
103+
* @throws Throwable
104+
*/
105+
public static void processEvents() throws Throwable {
106+
// for each file service, get watch service
107+
108+
HashMap<String, StringArray> contextsByHandler = new HashMap<>();
109+
HashSet<String> handlerToReset = new HashSet<>();
110+
111+
for (String fsId : fileSystemToService.keySet()) {
112+
WatchService watchService = fileSystemToService.get(fsId);
113+
WatchKey key = null;
114+
while ((key = watchService.poll()) != null) {
115+
116+
String dir = keyToDirMap.get(key);
117+
String handlerId = keyToHandlerId.get(key);
118+
if (dir == null) {
119+
System.err.println("WatchKey not recognized!!");
120+
continue;
121+
}
122+
123+
for (WatchEvent<?> event : key.pollEvents()) {
124+
if (event.kind() == SharedWatchService.OVERFLOW) {
125+
handlerToReset.add(handlerId);
126+
// If we're doing a full reset we don't need to process the events.
127+
break;
128+
}
129+
String context = dir +
130+
(event.context() == null ? "" : event.context());
131+
if (contextsByHandler.containsKey(handlerId)) {
132+
contextsByHandler.get(handlerId).add(context);
133+
} else {
134+
StringArray contexts = new StringArray();
135+
contexts.add(context);
136+
contextsByHandler.put(handlerId, contexts);
137+
}
138+
}
139+
140+
// reset key and remove from set if directory no longer accessible
141+
boolean valid = key.reset();
142+
if (!valid) {
143+
keyToDirMap.remove(key);
144+
keyToHandlerId.remove(key);
145+
146+
// all directories are inaccessible
147+
if (keyToDirMap.isEmpty()) {
148+
break;
149+
}
150+
}
151+
}
152+
}
153+
154+
for (String handlerId : handlerToReset) {
155+
// Remove the contexts for this handler, just trigger the full reset.
156+
contextsByHandler.remove(handlerId);
157+
handlerIdToHandler.get(handlerId).doReload();
158+
}
159+
160+
for (String handlerId : contextsByHandler.keySet()) {
161+
handlerIdToHandler.get(handlerId).handleUpdates(contextsByHandler.get(handlerId));
162+
}
163+
}
164+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package gov.noaa.pfel.coastwatch.util;
2+
3+
import com.cohort.array.StringArray;
4+
5+
public interface WatchUpdateHandler {
6+
public void doReload();
7+
public void handleUpdates(StringArray contexts) throws Throwable;
8+
}

0 commit comments

Comments
 (0)