diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index d4784012..da0acc2d 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -87,4 +87,6 @@ jobs: run: python -m pip install tox - name: Run tests - run: python -m tox -q -e py + # use a timeout to prevent CI hangs caused by + # deadlocks due to threading bugs + run: python -m tox -q -e py -- --timeout=120 diff --git a/src/watchdog/observers/api.py b/src/watchdog/observers/api.py index db9feef9..93839677 100644 --- a/src/watchdog/observers/api.py +++ b/src/watchdog/observers/api.py @@ -189,6 +189,7 @@ def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__() self._event_queue = EventQueue() self._timeout = timeout + self._lock = threading.RLock() @property def timeout(self) -> float: @@ -222,6 +223,9 @@ def dispatch_events(self, event_queue: EventQueue) -> None: def run(self) -> None: while self.should_keep_running(): + with self._lock: + if not self.should_keep_running(): + return try: self.dispatch_events(self.event_queue) except queue.Empty: @@ -234,7 +238,6 @@ class BaseObserver(EventDispatcher): def __init__(self, emitter_class: type[EventEmitter], *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None: super().__init__(timeout=timeout) self._emitter_class = emitter_class - self._lock = threading.RLock() self._watches: set[ObservedWatch] = set() self._handlers: defaultdict[ObservedWatch, set[FileSystemEventHandler]] = defaultdict(set) self._emitters: set[EventEmitter] = set() @@ -272,12 +275,15 @@ def emitters(self) -> set[EventEmitter]: return self._emitters def start(self) -> None: - for emitter in self._emitters.copy(): - try: - emitter.start() - except Exception: - self._remove_emitter(emitter) - raise + with self._lock: + for emitter in self._emitters.copy(): + if not self.should_keep_running(): + break + try: + emitter.start() + except Exception: + self._remove_emitter(emitter) + raise super().start() def schedule( @@ -390,6 +396,9 @@ def on_thread_stop(self) -> None: self.unschedule_all() def dispatch_events(self, event_queue: EventQueue) -> None: + with self._lock: + if not self.should_keep_running(): + return entry = event_queue.get(block=True) if entry is EventDispatcher.stop_event: return diff --git a/src/watchdog/observers/read_directory_changes.py b/src/watchdog/observers/read_directory_changes.py index 4faa9450..6ba852f0 100644 --- a/src/watchdog/observers/read_directory_changes.py +++ b/src/watchdog/observers/read_directory_changes.py @@ -58,8 +58,10 @@ def start(self) -> None: sleep(0.01) def on_thread_stop(self) -> None: - if self._whandle: - close_directory_handle(self._whandle) + whandle = self._whandle + if whandle: + self._whandle = None + close_directory_handle(whandle) def _read_events(self) -> list[WinAPINativeEvent]: if not self._whandle: diff --git a/src/watchdog/observers/winapi.py b/src/watchdog/observers/winapi.py index 3eb0493a..c54a7186 100644 --- a/src/watchdog/observers/winapi.py +++ b/src/watchdog/observers/winapi.py @@ -293,9 +293,12 @@ def _generate_observed_path_deleted_event() -> tuple[bytes, int]: return buff.raw, event_size +OPEN_HANDLES = set() + + def get_directory_handle(path: str) -> HANDLE: """Returns a Windows handle to the specified directory path.""" - return CreateFileW( + handle = CreateFileW( path, FILE_LIST_DIRECTORY, WATCHDOG_FILE_SHARE_FLAGS, @@ -304,9 +307,16 @@ def get_directory_handle(path: str) -> HANDLE: WATCHDOG_FILE_FLAGS, None, ) + OPEN_HANDLES.add(handle) + return handle def close_directory_handle(handle: HANDLE) -> None: + try: + OPEN_HANDLES.remove(handle) + except KeyError: + # another thread already closed the handle + return try: CancelIoEx(handle, None) # force ReadDirectoryChangesW to return CloseHandle(handle) diff --git a/src/watchdog/utils/__init__.py b/src/watchdog/utils/__init__.py index 2fe37eb6..f05497bc 100644 --- a/src/watchdog/utils/__init__.py +++ b/src/watchdog/utils/__init__.py @@ -56,6 +56,9 @@ def on_thread_stop(self) -> None: :meth:`stop()` calls this method. This method is called immediately after the thread is signaled to stop. + + Note that this can be called concurrently with on_thread_start, + so tear-down of state may need synchronization. """ def stop(self) -> None: @@ -69,10 +72,20 @@ def on_thread_start(self) -> None: This method is called right before this thread is started and this object's run() method is invoked. + + Note that this can be called concurrently with on_thread_stop, + so any set-up of state may need synchronization. """ def start(self) -> None: + if self._stopped_event.is_set(): + # stop was called before start, so don't start + return self.on_thread_start() + if self._stopped_event.is_set(): + # stop was called while we were doing setup, + # so don't actually spawn a thread + return threading.Thread.start(self) diff --git a/tests/conftest.py b/tests/conftest.py index b0f4ce52..bce2b07a 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,6 +1,7 @@ from __future__ import annotations import contextlib +import faulthandler import gc import os import threading @@ -27,10 +28,15 @@ def _no_thread_leaks(): We do not use pytest-threadleak because it is not reliable. """ old_thread_count = threading.active_count() + yield - gc.collect() # Clear the stuff from other function-level fixtures - assert threading.active_count() == old_thread_count # Only previously existing threads + # Clear the stuff from other function-level fixtures + gc.collect() + + # Only previously existing threads, allowing for threads that were + # being cleaned up while this test was starting + assert threading.active_count() <= old_thread_count @pytest.fixture(autouse=True) def _no_warnings(recwarn):