From 1384dab6128b0dd9ab1ce6d906e8077b74741691 Mon Sep 17 00:00:00 2001 From: Konstantin Ivanov Date: Fri, 4 Oct 2024 00:07:36 +0300 Subject: [PATCH 1/4] Added separate thread to clean dict with moved records, a delay added to make sure events remove after they can be used as a "source". Dict content changed from holding whole event to hold only src_path and time, since only src_path is used, time added to use it as a threshold to clean. --- src/watchdog/observers/inotify_c.py | 37 +++++++++++++++++++++---- tests/threaded/conftest.py | 20 ++++++++++++++ tests/threaded/test_inotify_c.py | 43 +++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 6 deletions(-) create mode 100644 tests/threaded/conftest.py create mode 100644 tests/threaded/test_inotify_c.py diff --git a/src/watchdog/observers/inotify_c.py b/src/watchdog/observers/inotify_c.py index bb2b1aa67..f5d5c32c3 100644 --- a/src/watchdog/observers/inotify_c.py +++ b/src/watchdog/observers/inotify_c.py @@ -1,5 +1,6 @@ from __future__ import annotations +import time import contextlib import ctypes import ctypes.util @@ -167,7 +168,9 @@ def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | No self._add_dir_watch(path, event_mask, recursive=recursive) else: self._add_watch(path, event_mask) - self._moved_from_events: dict[int, InotifyEvent] = {} + self._moved_from_events: dict[int, EventPathAndTime] = {} + thread_cleaner = threading.Thread(target=self.clear_move_records_older_than, args=(5,), daemon=True) + thread_cleaner.start() @property def event_mask(self) -> int: @@ -189,9 +192,18 @@ def fd(self) -> int: """The file descriptor associated with the inotify instance.""" return self._inotify_fd - def clear_move_records(self) -> None: - """Clear cached records of MOVED_FROM events""" - self._moved_from_events = {} + def clear_move_records_older_than(self, delay_sec: int) -> None: + """Clears cached records of MOVED_FROM events older than delay_sec.""" + sleep_max = 10 + while True: + if len(self._moved_from_events) != 0: + item =next(iter(self._moved_from_events)) + if self._moved_from_events[item].time < (time.time() - delay_sec): + del self._moved_from_events[item] + else: + time.sleep(delay_sec) + else: + time.sleep(sleep_max) def source_for_move(self, destination_event: InotifyEvent) -> bytes | None: """The source path corresponding to the given MOVED_TO event. @@ -208,8 +220,9 @@ def remember_move_from_event(self, event: InotifyEvent) -> None: """Save this event as the source event for future MOVED_TO events to reference. """ - self._moved_from_events[event.cookie] = event - + path_and_time = EventPathAndTime(event.src_path) + self._moved_from_events[event.cookie] = path_and_time + def add_watch(self, path: bytes) -> None: """Adds a watch for the given path. @@ -586,3 +599,15 @@ def __repr__(self) -> str: f" mask={self._get_mask_string(self.mask)}, cookie={self.cookie}," f" name={os.fsdecode(self.name)!r}>" ) + +class EventPathAndTime: + def __init__(self, src_path: bytes) -> None: + self._time = time.time() + self._src_path = src_path + + @property + def src_path(self) -> bytes: + return self._src_path + @property + def time(self) -> float: + return self._time \ No newline at end of file diff --git a/tests/threaded/conftest.py b/tests/threaded/conftest.py new file mode 100644 index 000000000..26cb0c3f8 --- /dev/null +++ b/tests/threaded/conftest.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import contextlib +import gc +import os +import threading +from functools import partial + +import pytest + +from tests.utils import ExpectEvent, Helper, P, StartWatching, TestEventQueue + + +@pytest.fixture(autouse=True) +def _no_thread_leaks(): + """ + A fixture override, disables thread counter from parent folder + """ + + diff --git a/tests/threaded/test_inotify_c.py b/tests/threaded/test_inotify_c.py new file mode 100644 index 000000000..c0e0b4c39 --- /dev/null +++ b/tests/threaded/test_inotify_c.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +import pytest + +from watchdog.utils import platform + +if not platform.is_linux(): + pytest.skip("GNU/Linux only.", allow_module_level=True) + +import ctypes +import errno +import logging +import os +import select +import struct +from typing import TYPE_CHECKING +from unittest.mock import patch + +from watchdog.events import DirCreatedEvent, DirDeletedEvent, DirModifiedEvent +from watchdog.observers.inotify_c import Inotify, InotifyConstants, InotifyEvent + +if TYPE_CHECKING: + from ..utils import Helper, P, StartWatching, TestEventQueue + +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + + + + +def test_watch_file_move(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: + folder = p() + path = p("this_is_a_file") + path_moved = p("this_is_a_file2") + with open(path, "a"): + pass + start_watching(path=folder) + os.rename(path, path_moved) + event, _ = event_queue.get(timeout=5) + assert event.src_path == path + assert event.dest_path == path_moved + assert repr(event) + From fa810e5f19221359e70db56cfa5aaa4f2d525caf Mon Sep 17 00:00:00 2001 From: Konstantin Ivanov Date: Sun, 6 Oct 2024 00:45:10 +0300 Subject: [PATCH 2/4] Redone with no-demon thread as current pytest fixtures do not like these Extra test folder deleted On branch fix_moved_records_not_cleaned Changes to be committed: modified: src/watchdog/observers/inotify_c.py modified: tests/test_inotify_c.py deleted: tests/threaded/conftest.py deleted: tests/threaded/test_inotify_c.py --- src/watchdog/observers/inotify_c.py | 27 +++++++++++++++--- tests/test_inotify_c.py | 12 ++++++++ tests/threaded/conftest.py | 20 -------------- tests/threaded/test_inotify_c.py | 43 ----------------------------- 4 files changed, 35 insertions(+), 67 deletions(-) delete mode 100644 tests/threaded/conftest.py delete mode 100644 tests/threaded/test_inotify_c.py diff --git a/src/watchdog/observers/inotify_c.py b/src/watchdog/observers/inotify_c.py index f5d5c32c3..5d4578e46 100644 --- a/src/watchdog/observers/inotify_c.py +++ b/src/watchdog/observers/inotify_c.py @@ -1,5 +1,6 @@ from __future__ import annotations +import atexit import time import contextlib import ctypes @@ -9,8 +10,10 @@ import select import struct import threading +import traceback from ctypes import c_char_p, c_int, c_uint32 from functools import reduce +from threading import Thread from typing import TYPE_CHECKING from watchdog.utils import UnsupportedLibcError @@ -143,8 +146,14 @@ class Inotify: ``True`` if subdirectories should be monitored; ``False`` otherwise. """ + def g(self): + for line in traceback.format_stack(): + print(line.strip()) + def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | None = None) -> None: + # The file descriptor associated with the inotify instance. + inotify_fd = inotify_init() if inotify_fd == -1: Inotify._raise_error() @@ -168,9 +177,12 @@ def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | No self._add_dir_watch(path, event_mask, recursive=recursive) else: self._add_watch(path, event_mask) + self._moved_from_events: dict[int, EventPathAndTime] = {} - thread_cleaner = threading.Thread(target=self.clear_move_records_older_than, args=(5,), daemon=True) - thread_cleaner.start() + self.stop_event = threading.Event() + self.cleaner_thread = threading.Thread(target=self.clear_move_records_older_than, args=(5,self.stop_event)) + self.cleaner_thread.start() + @property def event_mask(self) -> int: @@ -192,10 +204,10 @@ def fd(self) -> int: """The file descriptor associated with the inotify instance.""" return self._inotify_fd - def clear_move_records_older_than(self, delay_sec: int) -> None: + def clear_move_records_older_than(self, delay_sec: int, stop_event: threading.Event) -> None: """Clears cached records of MOVED_FROM events older than delay_sec.""" sleep_max = 10 - while True: + while not stop_event.is_set(): if len(self._moved_from_events) != 0: item =next(iter(self._moved_from_events)) if self._moved_from_events[item].time < (time.time() - delay_sec): @@ -205,6 +217,10 @@ def clear_move_records_older_than(self, delay_sec: int) -> None: else: time.sleep(sleep_max) + def stop_cleaner(self): + self.stop_event.set() + self.cleaner_thread.join() + def source_for_move(self, destination_event: InotifyEvent) -> bytes | None: """The source path corresponding to the given MOVED_TO event. @@ -246,9 +262,11 @@ def remove_watch(self, path: bytes) -> None: def close(self) -> None: """Closes the inotify instance and removes all associated watches.""" + #self.g() with self._lock: if not self._closed: self._closed = True + self.stop_cleaner() if self._path in self._wd_for_path: wd = self._wd_for_path[self._path] @@ -461,6 +479,7 @@ def _parse_event_buffer(event_buffer: bytes) -> Generator[tuple[int, int, int, b yield wd, mask, cookie, name + class InotifyEvent: """Inotify event struct wrapper. diff --git a/tests/test_inotify_c.py b/tests/test_inotify_c.py index 8d4b59d40..2bc29de48 100644 --- a/tests/test_inotify_c.py +++ b/tests/test_inotify_c.py @@ -157,6 +157,18 @@ def test_watch_file(p: P, event_queue: TestEventQueue, start_watching: StartWatc event, _ = event_queue.get(timeout=5) assert repr(event) +def test_watch_file_move(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: + folder = p() + path = p("this_is_a_file") + path_moved = p("this_is_a_file2") + with open(path, "a"): + pass + start_watching(path=folder) + os.rename(path, path_moved) + event, _ = event_queue.get(timeout=5) + assert event.src_path == path + assert event.dest_path == path_moved + assert repr(event) def test_event_equality(p: P) -> None: wd_parent_dir = 42 diff --git a/tests/threaded/conftest.py b/tests/threaded/conftest.py deleted file mode 100644 index 26cb0c3f8..000000000 --- a/tests/threaded/conftest.py +++ /dev/null @@ -1,20 +0,0 @@ -from __future__ import annotations - -import contextlib -import gc -import os -import threading -from functools import partial - -import pytest - -from tests.utils import ExpectEvent, Helper, P, StartWatching, TestEventQueue - - -@pytest.fixture(autouse=True) -def _no_thread_leaks(): - """ - A fixture override, disables thread counter from parent folder - """ - - diff --git a/tests/threaded/test_inotify_c.py b/tests/threaded/test_inotify_c.py deleted file mode 100644 index c0e0b4c39..000000000 --- a/tests/threaded/test_inotify_c.py +++ /dev/null @@ -1,43 +0,0 @@ -from __future__ import annotations - -import pytest - -from watchdog.utils import platform - -if not platform.is_linux(): - pytest.skip("GNU/Linux only.", allow_module_level=True) - -import ctypes -import errno -import logging -import os -import select -import struct -from typing import TYPE_CHECKING -from unittest.mock import patch - -from watchdog.events import DirCreatedEvent, DirDeletedEvent, DirModifiedEvent -from watchdog.observers.inotify_c import Inotify, InotifyConstants, InotifyEvent - -if TYPE_CHECKING: - from ..utils import Helper, P, StartWatching, TestEventQueue - -logging.basicConfig(level=logging.DEBUG) -logger = logging.getLogger(__name__) - - - - -def test_watch_file_move(p: P, event_queue: TestEventQueue, start_watching: StartWatching) -> None: - folder = p() - path = p("this_is_a_file") - path_moved = p("this_is_a_file2") - with open(path, "a"): - pass - start_watching(path=folder) - os.rename(path, path_moved) - event, _ = event_queue.get(timeout=5) - assert event.src_path == path - assert event.dest_path == path_moved - assert repr(event) - From 1ee2f1476da73e04887e92ce7be82dc91c86d120 Mon Sep 17 00:00:00 2001 From: Konstantin Ivanov Date: Sun, 6 Oct 2024 01:15:07 +0300 Subject: [PATCH 3/4] Cleaning up debug info --- src/watchdog/observers/inotify_c.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/watchdog/observers/inotify_c.py b/src/watchdog/observers/inotify_c.py index 5d4578e46..5e2ab3cbc 100644 --- a/src/watchdog/observers/inotify_c.py +++ b/src/watchdog/observers/inotify_c.py @@ -1,6 +1,5 @@ from __future__ import annotations -import atexit import time import contextlib import ctypes @@ -10,10 +9,8 @@ import select import struct import threading -import traceback from ctypes import c_char_p, c_int, c_uint32 from functools import reduce -from threading import Thread from typing import TYPE_CHECKING from watchdog.utils import UnsupportedLibcError @@ -146,14 +143,8 @@ class Inotify: ``True`` if subdirectories should be monitored; ``False`` otherwise. """ - def g(self): - for line in traceback.format_stack(): - print(line.strip()) - def __init__(self, path: bytes, *, recursive: bool = False, event_mask: int | None = None) -> None: - # The file descriptor associated with the inotify instance. - inotify_fd = inotify_init() if inotify_fd == -1: Inotify._raise_error() @@ -479,7 +470,6 @@ def _parse_event_buffer(event_buffer: bytes) -> Generator[tuple[int, int, int, b yield wd, mask, cookie, name - class InotifyEvent: """Inotify event struct wrapper. From 6c1c77a51435d7ae600a30eea672fa40b3b5f2e4 Mon Sep 17 00:00:00 2001 From: Konstantin Ivanov Date: Sun, 6 Oct 2024 01:20:10 +0300 Subject: [PATCH 4/4] Cleaning up debug info Changes to be committed: modified: src/watchdog/observers/inotify_c.py --- src/watchdog/observers/inotify_c.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/watchdog/observers/inotify_c.py b/src/watchdog/observers/inotify_c.py index 5e2ab3cbc..9853ca2cb 100644 --- a/src/watchdog/observers/inotify_c.py +++ b/src/watchdog/observers/inotify_c.py @@ -253,7 +253,6 @@ def remove_watch(self, path: bytes) -> None: def close(self) -> None: """Closes the inotify instance and removes all associated watches.""" - #self.g() with self._lock: if not self._closed: self._closed = True