Skip to content

Commit 29393f4

Browse files
fix: properly clean up threads when stopping Inotify. Improve Eventlet tests. (#1070)
* Improve cleaning up Inotify threads and add eventlet test cases. * Align SkipRepeatsQueue with Eventlet's Queue implementation. * Only run eventlet tests in Linux.
1 parent 4e9a86d commit 29393f4

File tree

9 files changed

+168
-37
lines changed

9 files changed

+168
-37
lines changed

src/watchdog/observers/inotify_c.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import ctypes.util
66
import errno
77
import os
8+
import select
89
import struct
910
import threading
1011
from ctypes import c_char_p, c_int, c_uint32
@@ -148,6 +149,9 @@ def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | No
148149
Inotify._raise_error()
149150
self._inotify_fd = inotify_fd
150151
self._lock = threading.Lock()
152+
self._closed = False
153+
self._waiting_to_read = True
154+
self._kill_r, self._kill_w = os.pipe()
151155

152156
# Stores the watch descriptor for a given path.
153157
self._wd_for_path: dict[bytes, int] = {}
@@ -230,13 +234,19 @@ def remove_watch(self, path: bytes) -> None:
230234
def close(self) -> None:
231235
"""Closes the inotify instance and removes all associated watches."""
232236
with self._lock:
233-
if self._path in self._wd_for_path:
234-
wd = self._wd_for_path[self._path]
235-
inotify_rm_watch(self._inotify_fd, wd)
237+
if not self._closed:
238+
self._closed = True
236239

237-
# descriptor may be invalid because file was deleted
238-
with contextlib.suppress(OSError):
239-
os.close(self._inotify_fd)
240+
if self._path in self._wd_for_path:
241+
wd = self._wd_for_path[self._path]
242+
inotify_rm_watch(self._inotify_fd, wd)
243+
244+
if self._waiting_to_read:
245+
# inotify_rm_watch() should write data to _inotify_fd and wake
246+
# the thread, but writing to the kill channel will gaurentee this
247+
os.write(self._kill_w, b'!')
248+
else:
249+
self._close_resources()
240250

241251
def read_events(self, *, event_buffer_size: int = DEFAULT_EVENT_BUFFER_SIZE) -> list[InotifyEvent]:
242252
"""Reads events from inotify and yields them."""
@@ -276,6 +286,21 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]:
276286
event_buffer = None
277287
while True:
278288
try:
289+
with self._lock:
290+
if self._closed:
291+
return []
292+
293+
self._waiting_to_read = True
294+
295+
select.select([self._inotify_fd, self._kill_r], [], [])
296+
297+
with self._lock:
298+
self._waiting_to_read = False
299+
300+
if self._closed:
301+
self._close_resources()
302+
return []
303+
279304
event_buffer = os.read(self._inotify_fd, event_buffer_size)
280305
except OSError as e:
281306
if e.errno == errno.EINTR:
@@ -340,6 +365,11 @@ def _recursive_simulate(src_path: bytes) -> list[InotifyEvent]:
340365

341366
return event_list
342367

368+
def _close_resources(self):
369+
os.close(self._inotify_fd)
370+
os.close(self._kill_r)
371+
os.close(self._kill_w)
372+
343373
# Non-synchronized methods.
344374
def _add_dir_watch(self, path: bytes, mask: int, *, recursive: bool) -> None:
345375
"""Adds a watch (optionally recursively) for the given directory path

src/watchdog/utils/bricks.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,13 @@ def _init(self, maxsize: int) -> None:
7272
super()._init(maxsize)
7373
self._last_item = None
7474

75-
def _put(self, item: Any) -> None:
75+
def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None:
7676
if self._last_item is None or item != self._last_item:
77-
super()._put(item)
78-
self._last_item = item
79-
else:
80-
# `put` increments `unfinished_tasks` even if we did not put
81-
# anything into the queue here
82-
self.unfinished_tasks -= 1
77+
super().put(item, block, timeout)
78+
79+
def _put(self, item: Any) -> None:
80+
super()._put(item)
81+
self._last_item = item
8382

8483
def _get(self) -> Any:
8584
item = super()._get()
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
if __name__ == '__main__':
2+
import eventlet
3+
4+
eventlet.monkey_patch()
5+
6+
import signal
7+
import sys
8+
import tempfile
9+
10+
from watchdog.observers import Observer
11+
from watchdog.events import LoggingEventHandler
12+
13+
with tempfile.TemporaryDirectory() as temp_dir:
14+
def run_observer():
15+
event_handler = LoggingEventHandler()
16+
observer = Observer()
17+
observer.schedule(event_handler, temp_dir)
18+
observer.start()
19+
eventlet.sleep(1)
20+
observer.stop()
21+
22+
def on_alarm(signum, frame):
23+
print("Observer.stop() never finished!", file=sys.stderr)
24+
sys.exit(1)
25+
26+
signal.signal(signal.SIGALRM, on_alarm)
27+
signal.alarm(4)
28+
29+
thread = eventlet.spawn(run_observer)
30+
thread.wait()
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
if __name__ == '__main__':
2+
import eventlet
3+
4+
eventlet.monkey_patch()
5+
6+
from watchdog.utils.bricks import SkipRepeatsQueue
7+
8+
q = SkipRepeatsQueue(10)
9+
q.put('A')
10+
q.put('A')
11+
q.put('A')
12+
q.put('A')
13+
q.put('B')
14+
q.put('A')
15+
16+
value = q.get()
17+
assert value == 'A'
18+
q.task_done()
19+
20+
assert q.unfinished_tasks == 2
21+
22+
value = q.get()
23+
assert value == 'B'
24+
q.task_done()
25+
26+
assert q.unfinished_tasks == 1
27+
28+
value = q.get()
29+
assert value == 'A'
30+
q.task_done()
31+
32+
assert q.empty()
33+
assert q.unfinished_tasks == 0

tests/markers.py

Lines changed: 0 additions & 7 deletions
This file was deleted.

tests/test_inotify_c.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import errno
1212
import logging
1313
import os
14+
import select
1415
import struct
1516
from typing import TYPE_CHECKING
1617
from unittest.mock import patch
@@ -56,6 +57,13 @@ def test_late_double_deletion(helper: Helper, p: P, event_queue: TestEventQueue,
5657
+ struct_inotify(wd=3, mask=const.IN_IGNORED)
5758
)
5859

60+
select_bkp = select.select
61+
62+
def fakeselect(read_list, *args, **kwargs):
63+
if inotify_fd in read_list:
64+
return [inotify_fd], [], []
65+
return select_bkp(read_list, *args, **kwargs)
66+
5967
os_read_bkp = os.read
6068

6169
def fakeread(fd, length):
@@ -92,8 +100,9 @@ def inotify_rm_watch(fd, wd):
92100
mock3 = patch.object(inotify_c, "inotify_init", new=inotify_init)
93101
mock4 = patch.object(inotify_c, "inotify_add_watch", new=inotify_add_watch)
94102
mock5 = patch.object(inotify_c, "inotify_rm_watch", new=inotify_rm_watch)
103+
mock6 = patch.object(select, "select", new=fakeselect)
95104

96-
with mock1, mock2, mock3, mock4, mock5:
105+
with mock1, mock2, mock3, mock4, mock5, mock6:
97106
start_watching(path=p(""))
98107
# Watchdog Events
99108
for evt_cls in [DirCreatedEvent, DirDeletedEvent] * 2:

tests/test_isolated.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import pytest
2+
import importlib
3+
4+
from watchdog.utils import platform
5+
6+
from .utils import run_isolated_test
7+
8+
9+
# Kqueue isn't supported by Eventlet, so BSD is out
10+
# Current usage ReadDirectoryChangesW on Windows is blocking, though async may be possible
11+
@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux")
12+
def test_observer_stops_in_eventlet():
13+
if not importlib.util.find_spec('eventlet'):
14+
pytest.skip("eventlet not installed")
15+
16+
run_isolated_test('eventlet_observer_stops.py')
17+
18+
19+
@pytest.mark.skipif(not platform.is_linux(), reason="Eventlet only supported in Linux")
20+
def test_eventlet_skip_repeat_queue():
21+
if not importlib.util.find_spec('eventlet'):
22+
pytest.skip("eventlet not installed")
23+
24+
run_isolated_test('eventlet_skip_repeat_queue.py')

tests/test_skip_repeats_queue.py

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
11
from __future__ import annotations
22

3-
import pytest
4-
53
from watchdog import events
64
from watchdog.utils.bricks import SkipRepeatsQueue
75

8-
from .markers import cpython_only
9-
106

11-
def basic_actions():
7+
def test_basic_queue():
128
q = SkipRepeatsQueue()
139

1410
e1 = (2, "fred")
@@ -25,10 +21,6 @@ def basic_actions():
2521
assert q.empty()
2622

2723

28-
def test_basic_queue():
29-
basic_actions()
30-
31-
3224
def test_allow_nonconsecutive():
3325
q = SkipRepeatsQueue()
3426

@@ -86,10 +78,3 @@ def test_consecutives_allowed_across_empties():
8678
q.put(e1) # this repeat is allowed because 'last' added is now gone from queue
8779
assert e1 == q.get()
8880
assert q.empty()
89-
90-
91-
@cpython_only
92-
def test_eventlet_monkey_patching():
93-
eventlet = pytest.importorskip("eventlet")
94-
eventlet.monkey_patch()
95-
basic_actions()

tests/utils.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
import dataclasses
44
import os
5+
import subprocess
6+
import sys
57
from queue import Queue
68
from typing import Protocol
79

@@ -97,3 +99,29 @@ def close(self) -> None:
9799
alive = [emitter.is_alive() for emitter in self.emitters]
98100
self.emitters = []
99101
assert alive == [False] * len(alive)
102+
103+
104+
def run_isolated_test(path):
105+
ISOALTED_TEST_PREFIX = os.path.join('tests', 'isolated')
106+
path = os.path.abspath(os.path.join(ISOALTED_TEST_PREFIX, path))
107+
108+
src_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), 'src')
109+
new_env = os.environ.copy()
110+
new_env['PYTHONPATH'] = os.pathsep.join(sys.path + [src_dir])
111+
112+
new_argv = [sys.executable, path]
113+
114+
p = subprocess.Popen(
115+
new_argv,
116+
env=new_env,
117+
)
118+
119+
# in case test goes haywire, don't let it run forever
120+
timeout = 10
121+
try:
122+
p.communicate(timeout=timeout)
123+
except subprocess.TimeoutExpired:
124+
p.kill()
125+
assert False, 'timed out'
126+
127+
assert p.returncode == 0

0 commit comments

Comments
 (0)