Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,5 +86,11 @@ jobs:
- name: Install dependencies
run: python -m pip install tox

# TODO: remove when the _fsevents C extension declares support for
# running without the GIL
- name: Force-disable the GIL on Mac
if: ${{ endsWith(matrix.python, 't') && matrix.os.matrix == 'macos' }}
run: echo "PYTHON_GIL=0" >> $GITHUB_ENV

- name: Run tests
run: python -m tox -q -e py
run: python -m tox -q -e py -- --timeout=120
38 changes: 24 additions & 14 deletions src/watchdog/observers/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ def __init__(self, *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__()
self._event_queue = EventQueue()
self._timeout = timeout
self._lock = threading.RLock()

@property
def timeout(self) -> float:
Expand Down Expand Up @@ -222,6 +223,9 @@ def dispatch_events(self, event_queue: EventQueue) -> None:

def run(self) -> None:
while self.should_keep_running():
with self._lock:
if not self.should_keep_running():
return
try:
self.dispatch_events(self.event_queue)
except queue.Empty:
Expand All @@ -234,7 +238,6 @@ class BaseObserver(EventDispatcher):
def __init__(self, emitter_class: type[EventEmitter], *, timeout: float = DEFAULT_OBSERVER_TIMEOUT) -> None:
super().__init__(timeout=timeout)
self._emitter_class = emitter_class
self._lock = threading.RLock()
self._watches: set[ObservedWatch] = set()
self._handlers: defaultdict[ObservedWatch, set[FileSystemEventHandler]] = defaultdict(set)
self._emitters: set[EventEmitter] = set()
Expand Down Expand Up @@ -272,12 +275,15 @@ def emitters(self) -> set[EventEmitter]:
return self._emitters

def start(self) -> None:
for emitter in self._emitters.copy():
try:
emitter.start()
except Exception:
self._remove_emitter(emitter)
raise
with self._lock:
for emitter in self._emitters.copy():
if not self.should_keep_running():
break
try:
emitter.start()
except Exception:
self._remove_emitter(emitter)
raise
super().start()

def schedule(
Expand Down Expand Up @@ -390,17 +396,21 @@ def on_thread_stop(self) -> None:
self.unschedule_all()

def dispatch_events(self, event_queue: EventQueue) -> None:
with self._lock:
if not self.should_keep_running():
return
entry = event_queue.get(block=True)
if entry is EventDispatcher.stop_event:
return

event, watch = entry

with self._lock:
# To allow unschedule/stop and safe removal of event handlers
# within event handlers itself, check if the handler is still
# registered after every dispatch.
for handler in self._handlers[watch].copy():
if handler in self._handlers[watch]:
handler.dispatch(event)
# To allow unschedule/stop and safe removal of event handlers
# within event handlers itself, check if the handler is still
# registered after every dispatch.
for handler in self._handlers[watch].copy():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is .copy() thread-safe? Otherwise it should likely be guarded by a lock as while.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rearranged so the lock is acquired in the for loop and only gets released during blocking calls, to avoid deadlocks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I checked again and it looks like I can actually leave the locking as it was before, so never mind.

with self._lock:
if handler not in self._handlers[watch]:
continue
handler.dispatch(event)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There may still be a race condition here: At this point the handler could have been removed since the check in the previous line.

event_queue.task_done()
6 changes: 4 additions & 2 deletions src/watchdog/observers/read_directory_changes.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,10 @@ def start(self) -> None:
sleep(0.01)

def on_thread_stop(self) -> None:
if self._whandle:
close_directory_handle(self._whandle)
whandle = self._whandle
if whandle:
self._whandle = None
Comment on lines +61 to +63
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand what race this is guarding against, but it seems like it would be better to have a lock around this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is guarding against the file handle re-use crash described here. Let me see if locking also works, since that's a lot more explicit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried adding explicit locking, but that deadlocks in test_emitter.py::test_delete_self. The main thread tries to join the emitter thread, but that is blocked on the observer closing the emitter's file handle.

close_directory_handle(whandle)

def _read_events(self) -> list[WinAPINativeEvent]:
if not self._whandle:
Expand Down
Loading