Skip to content

Commit 5131afc

Browse files
committed
Add DirectoryListener (#702)
1 parent 1535504 commit 5131afc

3 files changed

Lines changed: 324 additions & 0 deletions

File tree

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
/*
2+
* Copyright 2026, hbz
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.metafacture.io;
18+
19+
import org.metafacture.framework.FluxCommand;
20+
import org.metafacture.framework.ObjectReceiver;
21+
import org.metafacture.framework.annotations.Description;
22+
import org.metafacture.framework.annotations.In;
23+
import org.metafacture.framework.annotations.Out;
24+
import org.metafacture.framework.helpers.DefaultObjectPipe;
25+
26+
import java.io.IOException;
27+
import java.nio.file.FileSystems;
28+
import java.nio.file.FileVisitResult;
29+
import java.nio.file.Files;
30+
import java.nio.file.LinkOption;
31+
import java.nio.file.Path;
32+
import java.nio.file.SimpleFileVisitor;
33+
import java.nio.file.WatchEvent;
34+
import java.nio.file.WatchKey;
35+
import java.nio.file.WatchService;
36+
import java.nio.file.attribute.BasicFileAttributes;
37+
import java.util.HashMap;
38+
import java.util.Map;
39+
40+
/**
41+
* Listens to a directory and passes occurring filenames to the receiver.
42+
* If a file named {@value TRIGGER_SHUTDOWN_FILENAME} appears the process
43+
* is closed.
44+
* Keep bug @see <a href="https://bugs.openjdk.org/browse/JDK-8202759">JDK-8202759</a>
45+
* in mind: if files occur too fast the files may be missed by the watcher.
46+
*
47+
* @author Pascal Christoph (dr0i)
48+
*/
49+
@Description("Listens to a directory and passes occurring filenames to the receiver. " +
50+
"If a file named 'shutdownEtlNow' appears the process " +
51+
"is closed." +
52+
"Keep bug https://bugs.openjdk.org/browse/JDK-8202759 " +
53+
"in mind: if files occur too fast the files may be missed by the watcher.")
54+
@In(String.class)
55+
@Out(String.class)
56+
@FluxCommand("listen-directory")
57+
public final class DirectoryListener extends DefaultObjectPipe<String, ObjectReceiver<String>> {
58+
59+
/* This special filename triggers the end of listing and closes the module */
60+
public static final String TRIGGER_SHUTDOWN_FILENAME = "shutdownEtlNow";
61+
private static final WatchService WATCHER;
62+
63+
static {
64+
try {
65+
WATCHER = FileSystems.getDefault().newWatchService();
66+
}
67+
catch (final IOException e) {
68+
throw new RuntimeException(e);
69+
}
70+
}
71+
72+
private static final Map<WatchKey, Path> KEYS = new HashMap<>();
73+
74+
/**
75+
* Creates an instance of {@link DirectoryListener} if no IOException occurs.
76+
*/
77+
public DirectoryListener() {
78+
}
79+
80+
@Override
81+
public void process(final String directory) {
82+
83+
final Path dir = Path.of(directory);
84+
try {
85+
registerAll(dir);
86+
}
87+
catch (final IOException e) {
88+
throw new RuntimeException(e);
89+
}
90+
start(directory);
91+
}
92+
93+
private void start(final String directory) {
94+
final DirectoryWatcher directoryWatcher = new DirectoryWatcher();
95+
directoryWatcher.setDirectory(directory);
96+
final Thread thread = new Thread(directoryWatcher);
97+
thread.start();
98+
}
99+
100+
/**
101+
* Register the given directory with the WatchService.
102+
*
103+
* @param dir the directory to register
104+
*/
105+
private void register(final Path dir) throws IOException {
106+
final WatchKey key = dir.register(WATCHER, java.nio.file.StandardWatchEventKinds.ENTRY_CREATE, java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY);
107+
System.out.println("Add directory to watch: " + dir);
108+
KEYS.put(key, dir);
109+
}
110+
111+
/**
112+
* Register the given directory, and all its subdirectories, with the
113+
* WatchService.
114+
*
115+
* @param start root directory for registering all (sub)directories
116+
*/
117+
private void registerAll(final Path start) throws IOException {
118+
Files.walkFileTree(start, new SimpleFileVisitor<>() {
119+
@Override
120+
public FileVisitResult preVisitDirectory(final Path dir, final BasicFileAttributes attrs)
121+
throws IOException {
122+
register(dir);
123+
return FileVisitResult.CONTINUE;
124+
}
125+
});
126+
}
127+
128+
final class DirectoryWatcher implements Runnable {
129+
private String directory;
130+
131+
DirectoryWatcher() {
132+
}
133+
134+
private void setDirectory(final String directory) {
135+
this.directory = directory;
136+
}
137+
138+
public void run() {
139+
140+
while (true) {
141+
final WatchKey key;
142+
try {
143+
key = WATCHER.take();
144+
}
145+
catch (final InterruptedException e) {
146+
Thread.currentThread().interrupt();
147+
return;
148+
}
149+
final Path dir = KEYS.get(key);
150+
if (dir == null) {
151+
System.err.println("WatchKey not recognized!");
152+
continue;
153+
}
154+
155+
for (final WatchEvent<?> event : key.pollEvents()) {
156+
// an OVERFLOW event can occur if events are lost or discarded
157+
if (event.kind() == java.nio.file.StandardWatchEventKinds.OVERFLOW) {
158+
throw new OpenFailed("Overflow event occurred on directory " + directory);
159+
}
160+
System.out.println("Event kind:" + event.kind() + ". File affected: " + event.context() + ".");
161+
162+
@SuppressWarnings("unchecked")
163+
final Path fileName = ((WatchEvent<Path>) event).context();
164+
final Path absolutePath = dir.resolve(fileName);
165+
166+
processFile(fileName, absolutePath);
167+
}
168+
// reset key and remove from set if directory no longer accessible
169+
final boolean valid = key.reset();
170+
if (!valid) {
171+
KEYS.remove(key);
172+
// all directories are inaccessible
173+
if (KEYS.isEmpty()) {
174+
break;
175+
}
176+
}
177+
}
178+
}
179+
180+
private void processFile(final Path fileName, final Path absolutePath) {
181+
if (Files.isDirectory(absolutePath, LinkOption.NOFOLLOW_LINKS)) {
182+
try {
183+
registerAll(absolutePath);
184+
}
185+
catch (final IOException e) {
186+
throw new OpenFailed("IOException event occurred on directory " + directory, e);
187+
}
188+
}
189+
else {
190+
if (fileName.toString().equals(TRIGGER_SHUTDOWN_FILENAME)) {
191+
closeStream();
192+
Thread.currentThread().interrupt();
193+
}
194+
else {
195+
getReceiver().process(absolutePath.toString());
196+
}
197+
}
198+
}
199+
}
200+
}

metafacture-io/src/main/resources/flux-commands.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,4 @@ as-records org.metafacture.io.RecordReader
2323
open-resource org.metafacture.io.ResourceOpener
2424
open-tar org.metafacture.io.TarReader
2525
open-sru org.metafacture.io.SruOpener
26+
listen-directory org.metafacture.io.DirectoryListener
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Copyright 2026, hbz
3+
*
4+
* Licensed under the Apache License, Version 2.0 the "License";
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.metafacture.io;
18+
19+
import org.metafacture.framework.ObjectReceiver;
20+
21+
import org.junit.Assert;
22+
import org.junit.Before;
23+
import org.junit.Rule;
24+
import org.junit.Test;
25+
import org.junit.rules.TemporaryFolder;
26+
import org.mockito.Mock;
27+
import org.mockito.Mockito;
28+
import org.mockito.junit.MockitoJUnit;
29+
import org.mockito.junit.MockitoRule;
30+
31+
import java.io.File;
32+
import java.io.IOException;
33+
import java.nio.file.Files;
34+
import java.nio.file.Path;
35+
36+
/**
37+
* Tests for class {@link DirectoryListener}.
38+
*
39+
* @author Pascal Christoph (dr0i)
40+
*/
41+
public final class DirectoryListenerTest {
42+
43+
private static final DirectoryListener DIRECTORY_LISTENER = new DirectoryListener();
44+
private static final int MAX_MILLISECONDS_WAITING_OF_THREAD = 3000;
45+
private static final String FILE_NAME = "test";
46+
private static final String SUBDIRECTORY_NAME = "subdir";
47+
48+
@Rule
49+
public MockitoRule mockitoRule = MockitoJUnit.rule();
50+
51+
@Rule
52+
public TemporaryFolder tempFolder = new TemporaryFolder();
53+
54+
private String pathToDirectory;
55+
private String pathToSubdirectory;
56+
57+
@Mock
58+
private ObjectReceiver<String> receiver;
59+
60+
public DirectoryListenerTest() {
61+
}
62+
63+
@Before
64+
public void setup() {
65+
pathToDirectory = tempFolder.getRoot() + File.separator;
66+
DIRECTORY_LISTENER.setReceiver(receiver);
67+
DIRECTORY_LISTENER.process(pathToDirectory);
68+
pathToSubdirectory = pathToDirectory + SUBDIRECTORY_NAME + File.separator;
69+
}
70+
71+
@Test
72+
public void testFileOccurs() {
73+
final String pathToTestfile = pathToDirectory + FILE_NAME;
74+
createFile(pathToTestfile);
75+
Mockito.verify(receiver, org.mockito.Mockito.timeout(MAX_MILLISECONDS_WAITING_OF_THREAD)).process(pathToTestfile);
76+
}
77+
78+
@Test
79+
public void testFileOccursInSubdirectory() throws InterruptedException {
80+
createDirectory(pathToSubdirectory);
81+
final String pathToTestfile = pathToSubdirectory + FILE_NAME;
82+
Thread.sleep(100); // because of https://bugs.openjdk.org/browse/JDK-8202759
83+
createFile(pathToTestfile);
84+
Mockito.verify(receiver, org.mockito.Mockito.timeout(MAX_MILLISECONDS_WAITING_OF_THREAD)).process(pathToTestfile);
85+
}
86+
87+
@Test
88+
public void testDontProcessDirectoryWithoutFiles() throws InterruptedException {
89+
final String pathToTestfile = pathToDirectory + SUBDIRECTORY_NAME;
90+
Thread.sleep(400); // because of https://bugs.openjdk.org/browse/JDK-8202759
91+
createFile(pathToTestfile);
92+
Mockito.verify(receiver, org.mockito.Mockito.timeout(MAX_MILLISECONDS_WAITING_OF_THREAD).times(0)).process(pathToTestfile);
93+
}
94+
95+
@Test
96+
public void testTriggerShutdown() throws InterruptedException {
97+
final String pathToTestfile = pathToDirectory + DirectoryListener.TRIGGER_SHUTDOWN_FILENAME;
98+
createFile(pathToTestfile);
99+
Mockito.verify(receiver, org.mockito.Mockito.timeout(MAX_MILLISECONDS_WAITING_OF_THREAD).times(0)).process(
100+
pathToDirectory);
101+
Thread.sleep(100);
102+
Assert.assertTrue(DIRECTORY_LISTENER.isClosed());
103+
}
104+
105+
private void createFile(final String path) {
106+
final File testFile = new File(path);
107+
try {
108+
testFile.createNewFile();
109+
}
110+
catch (final IOException e) {
111+
throw new RuntimeException(e);
112+
}
113+
}
114+
115+
private void createDirectory(final String dir) {
116+
try {
117+
Files.createDirectory(Path.of(dir));
118+
}
119+
catch (final IOException e) {
120+
throw new RuntimeException(e);
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)