forked from PanicTitan/ComfyUI-Gallery
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfolder_monitor.py
More file actions
245 lines (194 loc) · 10.4 KB
/
Copy pathfolder_monitor.py
File metadata and controls
245 lines (194 loc) · 10.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# folder_monitor.py
import os
import time
import threading
from watchdog.observers import Observer
from watchdog.observers.polling import PollingObserver
from watchdog.events import FileSystemEventHandler, PatternMatchingEventHandler
from .folder_scanner import _scan_for_images # Import folder scanner
import asyncio
from server import PromptServer
import queue
from .gallery_config import gallery_log
class GalleryEventHandler(PatternMatchingEventHandler):
"""Handles file system events, including symlinks, recursively."""
def __init__(self, base_path, patterns=None, ignore_patterns=None, ignore_directories=False, case_sensitive=True, debounce_interval=0.5, extensions=None):
super().__init__(patterns=patterns, ignore_patterns=ignore_patterns, ignore_directories=ignore_directories, case_sensitive=case_sensitive)
self.base_path = os.path.realpath(base_path) # Use realpath for base_path
self.debounce_timer = None
self.debounce_interval = debounce_interval
# Use a dictionary to track events, keyed by (event_type, real_path)
self.processed_events = {}
self.result_queue = queue.Queue() # Queue for results.
self.running_scan = False # Flag to avoid multiple scans at the same time
self.extensions = extensions
self.last_known_folders = {} # Ensure last_known_folders is initialized empty
def on_any_event(self, event):
"""Handles events, including symlinks, with debouncing and duplicate prevention."""
if event.is_directory:
return
# Ignore temporary files
if event.src_path.endswith(('.swp', '.tmp', '~')):
return
real_path = os.path.realpath(event.src_path)
# Check if this event (type + path) has been processed recently
event_key = (event.event_type, real_path)
current_time = time.time()
if event_key in self.processed_events:
last_processed_time = self.processed_events[event_key]
if current_time - last_processed_time < self.debounce_interval:
return
# Mark this event as processed
self.processed_events[event_key] = current_time
if event.event_type in ('created', 'deleted', 'modified', 'moved'):
gallery_log(f"Watchdog detected {event.event_type}: {event.src_path} (Real path: {real_path}) - debouncing")
self.debounce_event()
def debounce_event(self):
"""Debounces the file system event."""
if self.debounce_timer and self.debounce_timer.is_alive():
self.debounce_timer.cancel()
self.debounce_timer = threading.Timer(self.debounce_interval, self.rescan_and_send_changes)
self.debounce_timer.start()
def rescan_and_send_changes(self):
"""Rescans, detects changes, sends updates, now thread-safe."""
if self.running_scan:
gallery_log("Another scan is running, skipping")
return
self.running_scan = True # Set the flag.
def thread_target():
"""Target function for the scanning thread."""
try:
folder_name = os.path.basename(self.base_path)
# Pass configured extensions to the scanner
new_folders_data, _ = _scan_for_images(self.base_path, folder_name, True, self.extensions)
old_folders_data = self.last_known_folders
changes = detect_folder_changes(old_folders_data, new_folders_data)
# Put results and last_known_folders into the queue.
self.result_queue.put((changes, new_folders_data))
except Exception as e:
# Put any exception into the queue for the main thread to handle.
self.result_queue.put(e)
def on_scan_complete():
"""Callback to run in the main thread after scanning."""
try:
result = self.result_queue.get() # Use get - BLOCKING
if isinstance(result, Exception):
gallery_log(f"FileSystemMonitor: Error during scan: {result}")
return
changes, new_folders_data = result
if changes:
gallery_log("FileSystemMonitor: Changes detected after debounce, sending updates")
from .server import sanitize_json_data
# Correctly schedule the send_sync call on the main thread.
PromptServer.instance.send_sync("Gallery.file_change", sanitize_json_data(changes)) # NO ASYNCIO NEEDED
else:
gallery_log("FileSystemMonitor: Changes detected by watchdog, but no relevant gallery changes after debounce.")
self.last_known_folders = new_folders_data # Update last_known_folders.
self.debounce_timer = None
except queue.Empty:
gallery_log("FileSystemMonitor: Queue is empty, this shouldn't happen normally.")
finally:
self.running_scan = False #Clear flag in all cases
# Start the scan in a separate thread.
scan_thread = threading.Thread(target=thread_target)
scan_thread.start()
#Schedule the callback to be called when the scan is complete.
scan_thread.join() # Wait for the scan thread to actually complete!
on_scan_complete() # THEN call the completion function, now guaranteed to have data.
class FileSystemMonitor:
"""Monitors the output directory, including symlinks, recursively."""
def __init__(self, base_path, interval=1.0, use_polling_observer=False, extensions=None):
self.base_path = base_path
self.interval = interval
self.use_polling_observer = use_polling_observer
self.extensions = extensions
if use_polling_observer:
self.observer = PollingObserver()
else:
self.observer = Observer()
# Generate patterns from extensions if provided
if self.extensions:
patterns = [f"*{ext}" if ext.startswith('.') else f"*.{ext}" for ext in self.extensions]
else:
patterns = ["*"]
self.event_handler = GalleryEventHandler(base_path=base_path, patterns=patterns, debounce_interval=0.5, extensions=self.extensions)
# Do NOT perform a blocking scan in __init__ to avoid startup freeze.
# Initial scan will be performed in the observer thread.
self.thread = None
def start_monitoring(self):
"""Starts the Watchdog observer."""
if self.thread is None or not self.thread.is_alive():
self.thread = threading.Thread(target=self._start_observer_thread, daemon=True)
self.thread.start()
gallery_log("FileSystemMonitor: Watchdog monitoring thread started.")
else:
gallery_log("FileSystemMonitor: Watchdog monitoring thread already running.")
def _start_observer_thread(self):
# Perform an initial background scan before scheduling the observer
try:
folder_name = os.path.basename(self.base_path)
gallery_log("FileSystemMonitor: Starting initial background scan...")
initial_data, _ = _scan_for_images(self.base_path, folder_name, True, self.extensions)
self.event_handler.last_known_folders = initial_data
gallery_log("FileSystemMonitor: Initial background scan complete.")
except Exception as e:
gallery_log(f"FileSystemMonitor: Error during initial scan: {e}")
# Outer restart loop: if the observer crashes for any reason, rebuild and retry.
while True:
try:
if self.use_polling_observer:
self.observer = PollingObserver()
else:
self.observer = Observer()
self.observer.schedule(self.event_handler, self.base_path, recursive=True)
self.observer.follow_directory_symlinks = True
self.observer.start()
gallery_log("FileSystemMonitor: Observer started.")
while self.observer.is_alive():
time.sleep(0.5)
gallery_log("FileSystemMonitor: Observer stopped unexpectedly, restarting...")
except KeyboardInterrupt:
self.stop_monitoring()
break
except Exception as e:
gallery_log(f"FileSystemMonitor: Observer crashed ({e}), restarting in 3s...")
finally:
try:
self.observer.stop()
except Exception:
pass
time.sleep(3)
def stop_monitoring(self):
"""Stops the Watchdog observer."""
if self.thread and self.thread.is_alive():
self.observer.stop()
if self.observer.is_alive():
self.observer.join()
self.thread = None
gallery_log("FileSystemMonitor: Watchdog monitoring thread stopped.")
else:
gallery_log("FileSystemMonitor: Watchdog monitoring thread was not running.")
# --- Helper function to detect folder changes ---
def detect_folder_changes(old_folders, new_folders):
"""Detects changes between two folder data dictionaries."""
changes = {"folders": {}}
all_folders = set(old_folders.keys()) | set(new_folders.keys())
for folder_name in all_folders:
old_folder = old_folders.get(folder_name, {})
new_folder = new_folders.get(folder_name, {})
folder_changes = {}
old_files = set(old_folder.keys())
new_files = set(new_folder.keys())
all_files = old_files | new_files
for filename in all_files:
old_file_data = old_folder.get(filename)
new_file_data = new_folder.get(filename)
if filename not in old_folder: # New file
folder_changes[filename] = {"action": "create", **new_file_data}
elif filename not in new_folder: # Removed file
folder_changes[filename] = {"action": "remove"}
elif old_file_data != new_file_data: # Updated file (simplistic comparison)
folder_changes[filename] = {"action": "update", **new_file_data}
if folder_changes:
changes["folders"][folder_name] = folder_changes
return changes