forked from PanicTitan/ComfyUI-Gallery
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
316 lines (284 loc) · 14.7 KB
/
Copy pathserver.py
File metadata and controls
316 lines (284 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
from server import PromptServer
from aiohttp import web
import os
import folder_paths
import time
from datetime import datetime
import json
import math
import pathlib
import threading
import queue
import asyncio
import shutil
from .folder_monitor import FileSystemMonitor
from .folder_scanner import _scan_for_images, DEFAULT_EXTENSIONS
from .gallery_config import disable_logs, gallery_log
# Add ComfyUI root to sys.path HERE
import sys
comfy_path = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(comfy_path)
monitor = None
# Placeholder directory. This *must* exist, even if it's empty.
PLACEHOLDER_DIR = os.path.join(comfy_path, "output") # os.path.abspath("./placeholder_static")
if not os.path.exists(PLACEHOLDER_DIR):
os.makedirs(PLACEHOLDER_DIR)
# Add a *placeholder* static route. This gets modified later.
PromptServer.instance.routes.static('/static_gallery', PLACEHOLDER_DIR, follow_symlinks=True, name='static_gallery_placeholder') #give a name to the route
# Initialize scan_lock here
PromptServer.instance.scan_lock = threading.Lock()
# Settings file for persistent user settings
SETTINGS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "user_settings.json")
def load_settings():
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
gallery_log(f"Error loading settings: {e}")
return {}
return {}
def save_settings_to_file(settings):
try:
with open(SETTINGS_FILE, 'w', encoding='utf-8') as f:
json.dump(settings, f, indent=4)
except Exception as e:
gallery_log(f"Error saving settings: {e}")
def sanitize_json_data(data):
"""Recursively sanitizes data to be JSON serializable."""
if isinstance(data, dict):
return {k: sanitize_json_data(v) for k, v in data.items()}
elif isinstance(data, list):
return [sanitize_json_data(item) for item in data]
elif isinstance(data, float):
if math.isnan(data) or math.isinf(data):
return None
return data
elif isinstance(data, (int, str, bool, type(None))):
return data
else:
return str(data)
@PromptServer.instance.routes.get("/Gallery/settings")
async def get_settings(request):
return web.json_response(load_settings())
@PromptServer.instance.routes.post("/Gallery/settings")
async def save_settings(request):
try:
data = await request.json()
save_settings_to_file(data)
return web.Response(text="Settings saved")
except Exception as e:
return web.Response(status=500, text=str(e))
@PromptServer.instance.routes.get("/Gallery/images")
async def get_gallery_images(request):
"""Endpoint to get gallery images, accepts relative_path."""
raw_rel = request.rel_url.query.get("relative_path", "./")
# Normalize query value: treat null/None/empty as root
if raw_rel is None or str(raw_rel).lower() == 'null' or str(raw_rel).strip() == "":
relative_path = "./"
else:
relative_path = raw_rel
# Fix: Only join if relative_path is not absolute or '.'
base_output_dir = folder_paths.get_output_directory()
if os.path.isabs(relative_path):
full_monitor_path = os.path.normpath(relative_path)
elif relative_path in ("./", ".", ""): # treat as root
full_monitor_path = base_output_dir
else:
full_monitor_path = os.path.normpath(os.path.join(base_output_dir, relative_path))
# Use a thread-safe queue to communicate between threads.
result_queue = queue.Queue()
def thread_target():
"""Target function for the scanning thread."""
with PromptServer.instance.scan_lock:
try:
# Load saved settings to determine extensions
saved = load_settings()
scan_extensions = saved.get('scanExtensions', DEFAULT_EXTENSIONS)
# Use the actual folder name as the root key
folder_name = os.path.basename(full_monitor_path)
folders_with_metadata, _ = _scan_for_images(
full_monitor_path, folder_name, True, scan_extensions
)
result_queue.put(folders_with_metadata) # Put the result in the queue
except Exception as e:
result_queue.put(e) # Put the exception in the queue
def on_scan_complete(folders_with_metadata):
"""Callback executed in the main thread to send the response."""
try:
if isinstance(folders_with_metadata, Exception):
gallery_log(f"Error in /Gallery/images: {folders_with_metadata}")
import traceback
traceback.print_exc()
return web.Response(status=500, text=str(folders_with_metadata))
sanitized_folders = sanitize_json_data(folders_with_metadata)
json_string = json.dumps({"folders": sanitized_folders})
return web.Response(text=json_string, content_type="application/json")
except Exception as e:
gallery_log(f"Error in on_scan_complete: {e}")
return web.Response(status=500, text=str(e))
# Start the scanning in a separate thread.
scan_thread = threading.Thread(target=thread_target)
scan_thread.start()
# Wait result and process it.
result = result_queue.get() # BLOCKING call
return on_scan_complete(result)
@PromptServer.instance.routes.post("/Gallery/monitor/start")
async def start_gallery_monitor(request):
"""Endpoint to start gallery monitoring, accepts relative_path."""
global monitor
from . import gallery_config
try:
data = await request.json()
# Normalize relative_path: if missing, null, or literal 'null', treat as root
relative_path = data.get("relative_path", "./")
if relative_path is None or str(relative_path).lower() == 'null' or str(relative_path).strip() == "":
relative_path = "./"
gallery_config.disable_logs = data.get("disable_logs", False)
gallery_config.use_polling_observer = data.get("use_polling_observer", False)
scan_extensions = data.get("scan_extensions", DEFAULT_EXTENSIONS)
disable_logs = gallery_config.disable_logs
use_polling_observer = gallery_config.use_polling_observer
full_monitor_path = os.path.normpath(os.path.join(folder_paths.get_output_directory(), "..", "output", relative_path))
gallery_log("disable_logs", disable_logs)
gallery_log("use_polling_observer", use_polling_observer)
if monitor and monitor.thread and monitor.thread.is_alive():
# Monitor is healthy — only restart if the path or settings changed.
current_path = os.path.normpath(str(monitor.base_path)) if monitor else None
settings_unchanged = (
current_path == full_monitor_path
and gallery_config.disable_logs == disable_logs
and gallery_config.use_polling_observer == use_polling_observer
)
if settings_unchanged:
gallery_log("FileSystemMonitor: Monitor already running with same settings, skipping restart.")
return web.Response(text="Gallery monitor already running", content_type="text/plain")
gallery_log("FileSystemMonitor: Settings changed, stopping previous monitor.")
monitor.stop_monitoring()
if not os.path.isdir(full_monitor_path):
return web.Response(status=400, text=f"Invalid relative_path: {relative_path}, path not found")
for route in PromptServer.instance.app.router.routes():
if route.name == 'static_gallery_placeholder':
route.resource._directory = pathlib.Path(full_monitor_path)
gallery_log(f"Serving static files from {full_monitor_path} at /static_gallery")
break
else:
gallery_log("Error: Placeholder static route not found!")
return web.Response(status=500, text="Placeholder route not found.")
monitor = FileSystemMonitor(full_monitor_path, interval=1.0, use_polling_observer=use_polling_observer, extensions=scan_extensions)
monitor.start_monitoring()
return web.Response(text="Gallery monitor started", content_type="text/plain")
except Exception as e:
gallery_log(f"Error starting gallery monitor: {e}")
import traceback
traceback.print_exc()
return web.Response(status=500, text=str(e))
@PromptServer.instance.routes.post("/Gallery/monitor/stop")
async def stop_gallery_monitor(request):
"""Endpoint to stop gallery monitoring."""
global monitor
from .gallery_config import gallery_log
if monitor and monitor.thread and monitor.thread.is_alive():
monitor.stop_monitoring()
monitor = None
for route in PromptServer.instance.app.router.routes():
if route.name == 'static_gallery_placeholder':
route.resource._directory = pathlib.Path(PLACEHOLDER_DIR)
gallery_log(f"Serving static files from {PLACEHOLDER_DIR} at /static_gallery")
break
return web.Response(text="Gallery monitor stopped", content_type="text/plain")
@PromptServer.instance.routes.patch("/Gallery/updateImages")
async def newSettings(request):
# This route is no longer used
return web.Response(status=200)
@PromptServer.instance.routes.post("/Gallery/delete")
async def delete_image(request):
"""Endpoint to delete an image."""
from .gallery_config import gallery_log
try:
data = await request.json()
image_url = data.get("image_path")
if not image_url:
return web.Response(status=400, text="image_path is required")
if image_url.startswith("/static_gallery/"):
relative_path = image_url[len("/static_gallery/"):]
else:
return web.Response(status=400, text="Invalid image_path format")
static_route = next((r for r in PromptServer.instance.app.router.routes() if getattr(r, 'name', None) == 'static_gallery_placeholder'), None)
if static_route is not None:
static_dir = str(static_route.resource._directory)
else:
static_dir = folder_paths.get_output_directory()
full_image_path = os.path.realpath(os.path.join(static_dir, relative_path))
real_static_dir = os.path.realpath(static_dir)
if not os.path.exists(full_image_path):
return web.Response(status=404, text=f"File not found: {full_image_path}")
if not os.path.normcase(full_image_path).startswith(
os.path.normcase(real_static_dir + os.sep)
):
return web.Response(status=403, text="Access denied: File outside of static directory")
try:
from send2trash import send2trash
send2trash(full_image_path)
gallery_log(f"Image moved to trash: {full_image_path}")
except Exception as e:
gallery_log(f"send2trash unavailable or failed ({type(e).__name__}: {e}), "
"falling back to permanent deletion.")
os.remove(full_image_path)
gallery_log(f"Image permanently deleted: {full_image_path}")
return web.Response(text=f"Image deleted: {image_url}")
except Exception as e:
gallery_log(f"Error deleting image: {e}")
return web.Response(status=500, text=str(e))
@PromptServer.instance.routes.post("/Gallery/move")
async def move_image(request):
"""Endpoint to move an image to a new location, relative to the current gallery root (current_path)."""
from .gallery_config import disable_logs, gallery_log
try:
data = await request.json()
source_path = data.get("source_path")
target_path = data.get("target_path")
current_path = data.get("current_path") or data.get("relative_path") or "./"
gallery_log(f"source_path: {source_path}")
gallery_log(f"target_path: {target_path}")
gallery_log(f"current_path: {current_path}")
if not source_path or not target_path:
return web.Response(status=400, text="source_path and target_path are required")
static_route = next((r for r in PromptServer.instance.app.router.routes() if getattr(r, 'name', None) == 'static_gallery_placeholder'), None)
if static_route is not None:
static_dir = str(static_route.resource._directory)
else:
static_dir = folder_paths.get_output_directory()
static_dir_basename = os.path.basename(os.path.normpath(static_dir))
def make_path(p):
if os.path.isabs(p):
return os.path.normpath(p)
if p.startswith(static_dir_basename + os.sep):
p = p[len(static_dir_basename + os.sep):]
elif p.startswith(static_dir_basename + "/"):
p = p[len(static_dir_basename + "/") :]
return os.path.normpath(os.path.join(static_dir, p))
full_source_path = make_path(source_path)
full_target_path = make_path(target_path)
gallery_log(f"static_dir: {static_dir}")
gallery_log(f"full_source_path: {full_source_path}")
gallery_log(f"full_target_path: {full_target_path}")
if not os.path.exists(full_source_path):
return web.Response(status=404, text=f"Source file not found: {full_source_path}")
if not os.path.realpath(full_source_path).startswith(os.path.realpath(static_dir)) or \
not os.path.realpath(full_target_path).startswith(os.path.realpath(static_dir)) or \
not os.path.realpath(full_source_path).startswith(os.path.realpath(comfy_path)) or \
not os.path.realpath(full_target_path).startswith(os.path.realpath(comfy_path)):
return web.Response(status=403, text="Access denied: File outside of allowed directory")
if os.path.isdir(full_target_path):
full_target_path = os.path.join(full_target_path, os.path.basename(full_source_path))
target_dir = os.path.dirname(full_target_path)
if not os.path.exists(target_dir):
os.makedirs(target_dir, exist_ok=True)
shutil.move(full_source_path, full_target_path)
return web.Response(text=f"Image moved from {source_path} to {target_path}")
except Exception as e:
gallery_log(f"Error moving image: {e}")
import traceback
traceback.print_exc()
return web.Response(status=500, text=str(e))