Skip to content

Commit 9319b47

Browse files
committed
refactor: use watchfiles for directory monitor
1 parent 23487b7 commit 9319b47

3 files changed

Lines changed: 351 additions & 113 deletions

File tree

app/monitor.py

Lines changed: 212 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
import threading
55
import time
66
import traceback
7+
from dataclasses import dataclass
78
from pathlib import Path
89
from threading import Lock
910
from typing import Any, Optional, Dict, List
1011

1112
from apscheduler.schedulers.background import BackgroundScheduler
12-
from watchdog.events import FileSystemEventHandler, FileSystemMovedEvent, FileSystemEvent
13-
from watchdog.observers.polling import PollingObserver
13+
from watchfiles import Change, DefaultFilter, watch
1414

1515
from app.chain import ChainBase
1616
from app.chain.storage import StorageChain
@@ -34,29 +34,195 @@ class MonitorChain(ChainBase):
3434
pass
3535

3636

37-
class FileMonitorHandler(FileSystemEventHandler):
37+
@dataclass(frozen=True)
38+
class DirectoryChangeEvent:
3839
"""
39-
目录监控响应类
40+
目录文件变化事件,隔离底层 watchfiles 事件结构。
4041
"""
42+
change_type: Change
43+
src_path: str
44+
is_directory: bool
4145

42-
def __init__(self, mon_path: Path, callback: Any, **kwargs):
43-
super(FileMonitorHandler, self).__init__(**kwargs)
46+
47+
class LocalDirectoryWatcher:
48+
"""
49+
基于 watchfiles 的本地目录监控线程。
50+
"""
51+
_HANDLE_CHANGES = {Change.added, Change.modified}
52+
53+
def __init__(self, mon_path: Path, callback: Any, force_polling: Optional[bool] = None):
54+
"""
55+
初始化本地目录监控。
56+
:param mon_path: 监控目录
57+
:param callback: 目录变化回调对象
58+
:param force_polling: 是否强制使用轮询模式,None 表示由 watchfiles 自动选择
59+
"""
4460
self._watch_path = mon_path
45-
self.callback = callback
61+
self._callback = callback
62+
self._force_polling = force_polling
63+
self._stop_event = threading.Event()
64+
self._thread: Optional[threading.Thread] = None
65+
self._watch_filter = DefaultFilter()
4666

47-
def on_created(self, event: FileSystemEvent):
67+
@property
68+
def watch_path(self) -> Path:
69+
"""
70+
获取监控目录。
71+
:return: 监控目录
72+
"""
73+
return self._watch_path
74+
75+
def start(self):
76+
"""
77+
启动本地目录监控线程。
78+
"""
79+
if not self._watch_path.exists():
80+
raise FileNotFoundError(f"监控目录不存在: {self._watch_path}")
81+
if not self._watch_path.is_dir():
82+
raise NotADirectoryError(f"监控路径不是目录: {self._watch_path}")
83+
if self.is_alive():
84+
logger.info(f"本地目录监控已在运行中: {self._watch_path}")
85+
return
86+
self._stop_event.clear()
87+
self._thread = threading.Thread(
88+
target=self._run,
89+
name=f"MoviePilot-DirectoryWatcher-{self._watch_path.name}",
90+
daemon=True
91+
)
92+
self._thread.start()
93+
94+
def stop(self):
95+
"""
96+
请求停止本地目录监控线程。
97+
"""
98+
self._stop_event.set()
99+
100+
def join(self, timeout: Optional[float] = None):
101+
"""
102+
等待本地目录监控线程退出。
103+
:param timeout: 最长等待秒数
104+
"""
105+
if self._thread:
106+
self._thread.join(timeout=timeout)
107+
108+
def is_alive(self) -> bool:
109+
"""
110+
判断监控线程是否仍在运行。
111+
:return: 线程存活状态
112+
"""
113+
return bool(self._thread and self._thread.is_alive())
114+
115+
def _run(self):
116+
"""
117+
运行 watchfiles 主循环,并在快速模式不可用时回退到轮询。
118+
"""
48119
try:
49-
self.callback.event_handler(event=event, text="创建", event_path=event.src_path,
50-
file_size=Path(event.src_path).stat().st_size)
51-
except Exception as e:
52-
logger.error(f"on_created 异常: {e}")
120+
self._run_watch(force_polling=self._force_polling)
121+
except Exception as err:
122+
if self._stop_event.is_set():
123+
return
124+
if self._force_polling is True:
125+
logger.error(f"本地目录监控发生错误: {self._watch_path} - {err}")
126+
logger.debug(traceback.format_exc())
127+
return
128+
logger.warn(f"快速模式监控 {self._watch_path} 失败,将自动切换到兼容模式: {err}")
129+
try:
130+
self._run_watch(force_polling=True)
131+
except Exception as fallback_err:
132+
if not self._stop_event.is_set():
133+
logger.error(f"兼容模式监控 {self._watch_path} 仍然失败: {fallback_err}")
134+
logger.debug(traceback.format_exc())
135+
136+
def _run_watch(self, force_polling: Optional[bool]):
137+
"""
138+
执行一次 watchfiles 监控循环。
139+
:param force_polling: 是否强制轮询
140+
"""
141+
for changes in watch(
142+
str(self._watch_path),
143+
watch_filter=self._watch_filter,
144+
stop_event=self._stop_event,
145+
rust_timeout=1000,
146+
yield_on_timeout=True,
147+
force_polling=force_polling,
148+
recursive=True,
149+
ignore_permission_denied=True):
150+
if self._stop_event.is_set():
151+
break
152+
if not changes:
153+
continue
154+
self._handle_changes(changes)
155+
156+
def _handle_changes(self, changes: set[tuple[Change, str]]):
157+
"""
158+
将 watchfiles 原始变更转换为目录监控事件。
159+
:param changes: watchfiles 返回的变更集合
160+
"""
161+
for change_type, path_str in sorted(changes, key=lambda item: item[1]):
162+
if change_type not in self._HANDLE_CHANGES:
163+
continue
164+
event_path = Path(path_str)
165+
event = self._build_event(change_type=change_type, event_path=event_path)
166+
if not event or event.is_directory:
167+
continue
168+
file_size = self._get_file_size(event_path)
169+
if file_size is None:
170+
continue
171+
text = self._change_text(change_type)
172+
try:
173+
self._callback.event_handler(
174+
event=event,
175+
text=text,
176+
event_path=path_str,
177+
file_size=file_size
178+
)
179+
except Exception as err:
180+
logger.error(f"处理本地目录监控事件失败: {path_str} - {err}")
181+
182+
@staticmethod
183+
def _build_event(change_type: Change, event_path: Path) -> Optional[DirectoryChangeEvent]:
184+
"""
185+
构建目录变化事件,路径已不存在时忽略。
186+
:param change_type: watchfiles 变化类型
187+
:param event_path: 变化路径
188+
:return: 目录变化事件
189+
"""
190+
try:
191+
is_directory = event_path.is_dir()
192+
except OSError as err:
193+
logger.debug(f"读取目录监控事件路径失败: {event_path} - {err}")
194+
return None
195+
if not event_path.exists():
196+
return None
197+
return DirectoryChangeEvent(
198+
change_type=change_type,
199+
src_path=event_path.as_posix(),
200+
is_directory=is_directory
201+
)
53202

54-
def on_moved(self, event: FileSystemMovedEvent):
203+
@staticmethod
204+
def _get_file_size(event_path: Path) -> Optional[int]:
205+
"""
206+
读取事件文件大小,文件已消失时返回 None。
207+
:param event_path: 事件文件路径
208+
:return: 文件大小
209+
"""
55210
try:
56-
self.callback.event_handler(event=event, text="移动", event_path=event.dest_path,
57-
file_size=Path(event.dest_path).stat().st_size)
58-
except Exception as e:
59-
logger.error(f"on_moved 异常: {e}")
211+
return event_path.stat().st_size
212+
except OSError as err:
213+
logger.debug(f"读取目录监控文件大小失败: {event_path} - {err}")
214+
return None
215+
216+
@staticmethod
217+
def _change_text(change_type: Change) -> str:
218+
"""
219+
转换 watchfiles 事件类型为日志文案。
220+
:param change_type: watchfiles 变化类型
221+
:return: 事件描述
222+
"""
223+
if change_type == Change.modified:
224+
return "修改"
225+
return "新增"
60226

61227

62228
class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
@@ -67,10 +233,8 @@ class Monitor(ConfigReloadMixin, metaclass=SingletonClass):
67233

68234
def __init__(self):
69235
super().__init__()
70-
# 退出事件
71-
self._event = threading.Event()
72-
# 监控服务
73-
self._observers = []
236+
# 本地目录监控服务
237+
self._watchers = []
74238
# 定时服务
75239
self._scheduler = None
76240
# 存储过照间隔(分钟)
@@ -435,32 +599,25 @@ def init(self):
435599
limits=limits)
436600
logger.info(f"监控模式决策: {reason}")
437601

438-
if use_polling:
439-
observer = PollingObserver()
440-
logger.info(f"使用兼容模式(轮询)监控 {mon_path}")
441-
else:
442-
observer = self.__choose_observer()
443-
if observer is None:
444-
logger.warn(f"快速模式不可用,自动切换到兼容模式监控 {mon_path}")
445-
observer = PollingObserver()
446-
else:
447-
logger.info(f"使用快速模式监控 {mon_path}")
448-
if limits['warnings']:
449-
for warning in limits['warnings']:
450-
logger.warn(f"系统限制警告: {warning}")
451-
if limits['max_user_watches'] > 0:
452-
usage_percent = (file_count / limits['max_user_watches']) * 100
453-
logger.info(
454-
f"系统监控资源使用率: {usage_percent:.1f}% ({file_count}/{limits['max_user_watches']})")
455-
456-
self._observers.append(observer)
457-
observer.schedule(FileMonitorHandler(mon_path=mon_path, callback=self),
458-
path=str(mon_path),
459-
recursive=True)
460-
observer.daemon = True
461-
observer.start()
462-
463602
mode_name = "兼容模式(轮询)" if use_polling else "快速模式"
603+
logger.info(f"使用{mode_name}监控 {mon_path}")
604+
if not use_polling:
605+
if limits['warnings']:
606+
for warning in limits['warnings']:
607+
logger.warn(f"系统限制警告: {warning}")
608+
if limits['max_user_watches'] > 0:
609+
usage_percent = (file_count / limits['max_user_watches']) * 100
610+
logger.info(
611+
f"系统监控资源使用率: {usage_percent:.1f}% ({file_count}/{limits['max_user_watches']})")
612+
613+
watcher = LocalDirectoryWatcher(
614+
mon_path=mon_path,
615+
callback=self,
616+
force_polling=True if use_polling else None
617+
)
618+
self._watchers.append(watcher)
619+
watcher.start()
620+
464621
logger.info(f"✓ 本地目录监控已启动: {mon_path} [{mode_name}]")
465622

466623
except Exception as e:
@@ -521,64 +678,6 @@ def init(self):
521678
remote_count = len([d for d in monitor_dirs if d.storage != "local" and d.monitor_type == "monitor"])
522679
logger.info(f"目录监控启动完成: 本地监控 {local_count} 个,远程监控 {remote_count} 个")
523680

524-
def __choose_observer(self) -> Optional[Any]:
525-
"""
526-
选择最优的监控模式(带错误处理和自动回退)
527-
"""
528-
system = platform.system()
529-
530-
observers_to_try = []
531-
532-
try:
533-
if system == 'Linux':
534-
observers_to_try = [
535-
('InotifyObserver',
536-
lambda: self.__try_import_observer('watchdog.observers.inotify', 'InotifyObserver')),
537-
]
538-
elif system == 'Darwin':
539-
observers_to_try = [
540-
('FSEventsObserver',
541-
lambda: self.__try_import_observer('watchdog.observers.fsevents', 'FSEventsObserver')),
542-
]
543-
elif system == 'Windows':
544-
observers_to_try = [
545-
('WindowsApiObserver',
546-
lambda: self.__try_import_observer('watchdog.observers.read_directory_changes',
547-
'WindowsApiObserver')),
548-
]
549-
550-
# 尝试每个观察者
551-
for observer_name, observer_func in observers_to_try:
552-
try:
553-
observer_class = observer_func()
554-
if observer_class:
555-
# 尝试创建实例以验证是否可用
556-
test_observer = observer_class()
557-
test_observer.stop() # 立即停止测试实例
558-
logger.debug(f"成功初始化 {observer_name}")
559-
return observer_class()
560-
except Exception as e:
561-
logger.debug(f"初始化 {observer_name} 失败: {e}")
562-
continue
563-
564-
except Exception as e:
565-
logger.debug(f"选择观察者时出错: {e}")
566-
567-
logger.debug("所有快速监控模式都不可用,将使用兼容模式")
568-
return None
569-
570-
@staticmethod
571-
def __try_import_observer(module_name: str, class_name: str):
572-
"""
573-
尝试导入观察者类
574-
"""
575-
try:
576-
module = __import__(module_name, fromlist=[class_name])
577-
return getattr(module, class_name)
578-
except (ImportError, AttributeError) as e:
579-
logger.debug(f"导入 {module_name}.{class_name} 失败: {e}")
580-
return None
581-
582681
def polling_observer(self, storage: str, mon_paths: List[Path]):
583682
"""
584683
轮询监控(改进版)
@@ -738,17 +837,19 @@ def stop(self):
738837
"""
739838
退出监控
740839
"""
741-
self._event.set()
742-
if self._observers:
840+
if self._watchers:
743841
logger.info("正在停止本地目录监控服务...")
744-
for observer in self._observers:
842+
for watcher in self._watchers:
745843
try:
746-
observer.stop()
747-
observer.join()
748-
logger.debug(f"已停止监控服务: {observer}")
844+
watcher.stop()
845+
watcher.join(timeout=5)
846+
if watcher.is_alive():
847+
logger.warning(f"本地目录监控线程在5秒内未能停止: {watcher.watch_path}")
848+
else:
849+
logger.debug(f"已停止本地目录监控服务: {watcher.watch_path}")
749850
except Exception as e:
750851
logger.error(f"停止目录监控服务出现了错误:{e}")
751-
self._observers = []
852+
self._watchers = []
752853
logger.info("本地目录监控服务已停止")
753854
if self._scheduler:
754855
self._scheduler.remove_all_jobs()
@@ -763,4 +864,3 @@ def stop(self):
763864
self._cache.close()
764865
if self._snapshot_cache:
765866
self._snapshot_cache.close()
766-
self._event.clear()

requirements.in

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ starlette~=0.46.2
4848
PyVirtualDisplay~=3.0
4949
psutil~=7.0.0
5050
python-dotenv~=1.1.1
51-
watchdog~=6.0.0
5251
watchfiles~=1.1.0
5352
click~=8.2.1
5453
parse~=1.20.2

0 commit comments

Comments
 (0)