-
Notifications
You must be signed in to change notification settings - Fork 72
Expand file tree
/
Copy pathasset_processor.py
More file actions
376 lines (311 loc) · 13.6 KB
/
Copy pathasset_processor.py
File metadata and controls
376 lines (311 loc) · 13.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
"""Image processing utilities for asset viewing and thumbnail generation"""
import base64
import logging
import os
import requests
from dataclasses import dataclass
from io import BytesIO
from typing import Dict, Any, Optional, Tuple, Union
try:
from PIL import Image, ImageOps
PIL_AVAILABLE = True
except ImportError:
PIL_AVAILABLE = False
logging.warning("Pillow not available. Image processing features will be limited.")
logger = logging.getLogger("AssetProcessor")
# Simple in-memory cache for processed previews
_preview_cache: Dict[str, "EncodedImage"] = {}
def fetch_asset_bytes(asset_url: str, timeout: int = 30) -> bytes:
"""Fetch asset bytes from ComfyUI /view endpoint"""
try:
response = requests.get(asset_url, timeout=timeout)
response.raise_for_status()
return response.content
except requests.RequestException as e:
logger.error(f"Failed to fetch asset from {asset_url}: {e}")
raise
def get_image_metadata(image_bytes: bytes) -> Dict[str, Any]:
"""Extract width, height, format from image bytes"""
if not PIL_AVAILABLE:
return {"width": None, "height": None, "format": None}
try:
with Image.open(BytesIO(image_bytes)) as img:
return {
"width": img.width,
"height": img.height,
"format": img.format
}
except Exception as e:
logger.warning(f"Failed to extract image metadata: {e}")
return {"width": None, "height": None, "format": None}
def should_downscale(width: int, height: int, max_dim: int) -> bool:
"""Determine if image needs downscaling"""
return width > max_dim or height > max_dim
def create_thumbnail(
image_bytes: bytes,
max_dim: int = 512,
quality: int = 75,
format: str = "JPEG"
) -> bytes:
"""Create downscaled thumbnail, re-encode as JPEG"""
if not PIL_AVAILABLE:
raise ImportError("Pillow is required for image processing")
try:
with Image.open(BytesIO(image_bytes)) as img:
# Convert to RGB if necessary (for JPEG)
if img.mode in ("RGBA", "LA", "P"):
# Create white background for transparency
background = Image.new("RGB", img.size, (255, 255, 255))
if img.mode == "P":
img = img.convert("RGBA")
background.paste(img, mask=img.split()[-1] if img.mode in ("RGBA", "LA") else None)
img = background
elif img.mode != "RGB":
img = img.convert("RGB")
# Calculate new dimensions
width, height = img.size
if width > max_dim or height > max_dim:
if width > height:
new_width = max_dim
new_height = int(height * (max_dim / width))
else:
new_height = max_dim
new_width = int(width * (max_dim / height))
img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
# Save to bytes
output = BytesIO()
img.save(output, format=format, quality=quality, optimize=True)
return output.getvalue()
except Exception as e:
logger.error(f"Failed to create thumbnail: {e}")
raise
def strip_metadata(image_bytes: bytes) -> bytes:
"""Remove EXIF and other metadata chunks"""
if not PIL_AVAILABLE:
return image_bytes
try:
with Image.open(BytesIO(image_bytes)) as img:
# Create new image without metadata
data = list(img.getdata())
image_without_exif = Image.new(img.mode, img.size)
image_without_exif.putdata(data)
# Save to bytes
output = BytesIO()
# Preserve format if possible
format = img.format or "JPEG"
if format == "PNG":
image_without_exif.save(output, format="PNG", optimize=True)
else:
image_without_exif.save(output, format="JPEG", quality=95, optimize=True)
return output.getvalue()
except Exception as e:
logger.warning(f"Failed to strip metadata, returning original: {e}")
return image_bytes
@dataclass(frozen=True)
class EncodedImage:
"""Encoded image result with all metrics"""
b64: str # Base64 string (without data URI prefix)
mime_type: str # image/webp
size_px: Tuple[int, int] # Final dimensions
bytes_len: int # Raw byte size before base64
b64_chars: int # Base64 character count (what matters for serialized response)
raw_bytes: bytes # Raw encoded bytes (for FastMCP.Image)
def get_cache_key(asset_id: str, max_dim: int, quality: int) -> str:
"""Generate cache key for processed preview"""
return f"{asset_id}:{max_dim}:webp:{quality}"
def _get_cached_preview(cache_key: str) -> Optional[EncodedImage]:
"""Get cached preview if available"""
return _preview_cache.get(cache_key)
def _cache_preview(cache_key: str, encoded: EncodedImage):
"""Cache processed preview (simple LRU: keep last 100 entries)"""
if len(_preview_cache) > 100:
_preview_cache.pop(next(iter(_preview_cache)))
_preview_cache[cache_key] = encoded
def estimate_response_chars(b64_chars: int, json_overhead: int = 200) -> int:
"""Estimate total serialized response size (for logging/debugging)"""
# Rough estimate: base64 + JSON structure + surrounding text
return b64_chars + json_overhead
def mcp_image_content(encoded: EncodedImage) -> dict:
"""Convert EncodedImage to MCP ImageContent structure"""
# MCP ImageContent expects data URI format: "data:image/webp;base64,<base64>"
return {
"type": "image",
"data": f"data:{encoded.mime_type};base64,{encoded.b64}",
"mimeType": encoded.mime_type,
}
def encode_preview_for_mcp(
image_source: Union[str, bytes, BytesIO],
*,
max_dim: int = 512,
max_b64_chars: int = 100_000, # Base64 character budget (100KB - conservative to prevent hangs)
quality: int = 70,
strip_metadata: bool = True,
cache_key: Optional[str] = None,
) -> EncodedImage:
"""
Loads an image, downscales, re-encodes to WebP, enforces base64 budget, returns base64.
Designed for MCP tool responses where serialized payload size matters.
Enforces budget on base64 character count (what Cursor actually sees), not raw bytes.
Uses deterministic quality/downscale ladder for predictable behavior.
Args:
image_source: URL (str), file path (str), bytes, or BytesIO
max_dim: Maximum dimension in pixels (default: 512, hard cap)
max_b64_chars: Maximum base64 character count (default: 100000, ~100KB - conservative)
quality: Starting quality level (default: 70)
strip_metadata: Remove EXIF/metadata (default: True)
cache_key: Optional cache key for result caching
Returns:
EncodedImage with base64, mime_type, dimensions, and metrics
Raises:
ValueError: If image still exceeds budget after all optimizations
ImportError: If Pillow is not available
"""
if not PIL_AVAILABLE:
raise ImportError("Pillow is required for image processing. Install with: pip install Pillow")
# Check cache first
if cache_key:
cached = _get_cached_preview(cache_key)
if cached:
logger.debug(f"Cache hit for {cache_key}")
return cached
# Load image from various sources and track source size
src_bytes = 0
if isinstance(image_source, str):
# URL or file path
if image_source.startswith(("http://", "https://")):
image_bytes = fetch_asset_bytes(image_source)
src_bytes = len(image_bytes)
img_source = BytesIO(image_bytes)
else:
# File path
if not os.path.exists(image_source):
raise FileNotFoundError(image_source)
src_bytes = os.path.getsize(image_source)
img_source = image_source
elif isinstance(image_source, bytes):
src_bytes = len(image_source)
img_source = BytesIO(image_source)
else:
# Already BytesIO - can't get size easily, will be 0
img_source = image_source
# Track source dimensions for logging
src_w, src_h = 0, 0
# Load and normalize image
with Image.open(img_source) as loaded_im:
# Apply EXIF orientation correction (returns new Image object)
im = ImageOps.exif_transpose(loaded_im)
src_w, src_h = im.size
# WebP alpha handling: keep alpha for WebP, flatten for JPEG (if we add it later)
# For now, WebP only - keep alpha if present
if im.mode in ("RGBA", "LA"):
# Keep alpha for WebP
pass
elif im.mode not in ("RGB", "L"):
# Convert other modes to RGB (returns new Image object)
im = im.convert("RGB")
# Deterministic quality/downscale ladder
# Quality levels to try: [70, 55, 40]
# Downscale targets: [max_dim, 384, 256] (if needed)
quality_levels = [quality, 55, 40]
downscale_targets = [max_dim, 384, 256]
final_encoded = None
final_q = None
final_dim = None
for downscale_target in downscale_targets:
# Downscale to target (maintain aspect ratio)
w, h = im.size
if max(w, h) > downscale_target:
scale = min(1.0, downscale_target / max(w, h))
new_size = (max(1, int(w * scale)), max(1, int(h * scale)))
im_resized = im.resize(new_size, Image.Resampling.LANCZOS)
else:
im_resized = im
# Try quality levels
for q in quality_levels:
buf = BytesIO()
# Save as WebP
save_kwargs = {
"format": "WEBP",
"quality": q,
"method": 5, # Method 5 trades CPU for size (good balance)
}
# Preserve alpha for WebP if present
if im_resized.mode in ("RGBA", "LA"):
save_kwargs["lossless"] = False # Use lossy compression
im_resized.save(buf, **save_kwargs)
encoded_bytes = buf.getvalue()
b64_string = base64.b64encode(encoded_bytes).decode("ascii")
b64_chars = len(b64_string)
# Account for data URI prefix in budget check
# "data:image/webp;base64," adds ~23 chars
data_uri_prefix_len = len("data:image/webp;base64,")
total_payload_chars = b64_chars + data_uri_prefix_len
# Check if within budget (including prefix)
if total_payload_chars <= max_b64_chars:
final_encoded = encoded_bytes
final_q = q
final_dim = im_resized.size
break
if final_encoded is not None:
break
# If still too large, refuse to inline
if final_encoded is None:
# Last attempt: try smallest size with lowest quality
w, h = im.size
smallest_dim = 256
if max(w, h) > smallest_dim:
scale = min(1.0, smallest_dim / max(w, h))
new_size = (max(1, int(w * scale)), max(1, int(h * scale)))
im_resized = im.resize(new_size, Image.Resampling.LANCZOS)
else:
im_resized = im
buf = BytesIO()
im_resized.save(buf, format="WEBP", quality=35, method=5)
encoded_bytes = buf.getvalue()
b64_string = base64.b64encode(encoded_bytes).decode("ascii")
b64_chars = len(b64_string)
# Account for data URI prefix
data_uri_prefix_len = len("data:image/webp;base64,")
total_payload_chars = b64_chars + data_uri_prefix_len
if total_payload_chars <= max_b64_chars:
final_encoded = encoded_bytes
final_q = 35
final_dim = im_resized.size
else:
# Refuse to inline - exceeds budget even at minimum settings
raise ValueError(
f"Image exceeds base64 budget: {b64_chars} chars > {max_b64_chars} chars "
f"(even at {smallest_dim}px, quality=35). Refusing to inline."
)
# Create result
b64_string = base64.b64encode(final_encoded).decode("ascii")
b64_chars = len(b64_string)
# Account for data URI prefix in final payload size
# "data:image/webp;base64," adds ~23 chars, so check total payload
data_uri_prefix_len = len(f"data:image/webp;base64,")
total_payload_chars = b64_chars + data_uri_prefix_len
# If total payload exceeds budget, refuse to inline
if total_payload_chars > max_b64_chars:
raise ValueError(
f"Image exceeds base64 budget: total payload {total_payload_chars} chars > {max_b64_chars} chars "
f"(base64: {b64_chars} chars + prefix: {data_uri_prefix_len} chars). Refusing to inline."
)
result = EncodedImage(
b64=b64_string,
mime_type="image/webp",
size_px=final_dim,
bytes_len=len(final_encoded),
b64_chars=b64_chars,
raw_bytes=final_encoded, # Raw bytes for FastMCP.Image
)
# Cache result
if cache_key:
_cache_preview(cache_key, result)
# Log telemetry
logger.info(
f"view_asset encoding: src={src_bytes}B src_dims={src_w}x{src_h} "
f"preview_dims={final_dim[0]}x{final_dim[1]} format=webp quality={final_q} "
f"encoded={len(final_encoded)}B b64_chars={b64_chars} total_payload={total_payload_chars} "
f"response_est={estimate_response_chars(total_payload_chars)}chars"
)
return result