diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index adc2725..6d29ae8 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -30,7 +30,7 @@ jobs:
- name: Set up Python
uses: actions/setup-python@v5
with:
- python-version: '3.13'
+ python-version: '3.12'
- name: Install system deps
run: |
@@ -98,7 +98,7 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
- python-version: '3.13'
+ python-version: '3.12'
- name: Cache pip
uses: actions/cache@v4
with:
diff --git a/.github/workflows/release-build.yml b/.github/workflows/release-build.yml
index c55e493..7e99160 100644
--- a/.github/workflows/release-build.yml
+++ b/.github/workflows/release-build.yml
@@ -147,6 +147,7 @@ jobs:
--icon assets/app_icon.ico `
--paths src `
--hidden-import core.build_info `
+ --hidden-import mediapipe `
--add-data "src/ui/dark_theme.qss;." `
--add-data "assets/app_icon.ico;." `
--add-data "assets/app_icon.png;." `
@@ -161,6 +162,8 @@ jobs:
--hidden-import torch `
--hidden-import sklearn `
--hidden-import sentence_transformers `
+ --collect-data mediapipe `
+ --collect-data pyiqa `
--add-data "models;models" `
--runtime-hook runtime_hook.py `
src/main.py
@@ -214,6 +217,7 @@ jobs:
--icon assets/photosort.icns \
--paths src \
--hidden-import core.build_info \
+ --hidden-import mediapipe \
--add-data src/ui/dark_theme.qss:. \
--add-data assets/app_icon.ico:. \
--add-data assets/app_icon.png:. \
@@ -236,6 +240,8 @@ jobs:
--hidden-import torch \
--hidden-import sklearn \
--hidden-import sentence_transformers \
+ --collect-data mediapipe \
+ --collect-data pyiqa \
--add-data models:models \
src/main.py
diff --git a/README.md b/README.md
index fdaa13d..9b7ffe8 100644
--- a/README.md
+++ b/README.md
@@ -19,7 +19,7 @@ PhotoSort is a powerful desktop application focused on speed designed to streaml
* **Fast Processing**: Intensive operations (scanning, thumbnailing, analysis) run once in batch to ensure fast image scrolling.
* **Optimized Image Handling**: Supports a wide range of formats, including various RAW types, with efficient caching.
* **Intelligent Image Rotation**: Smart rotation system that automatically tries lossless metadata rotation first, with optional fallback to pixel rotation when needed.
- * **AI Best-Shot Ranking**: Compare stacks with either the bundled multi-model pipeline or an OpenAI-compatible vision model (e.g. Qwen3-VL).
+* **AI Best-Shot Ranking**: Compare stacks with either the bundled MUSIQ/MANIQA/LIQE pipeline or an OpenAI-compatible vision model (e.g. Qwen3-VL).
* **AI Star Ratings**: Ask the configured AI engine to score individual photos with 1–5 stars.
- **Update Notifications**: Automatically checks for new releases and notifies users when updates are available, with direct download links.
@@ -50,10 +50,12 @@ https://github.com/duartebarbosadev/photosort/releases
> **Note**: These dependencies are only required on macOS. Windows and Linux users can skip this step.
-3. **Create a virtual environment (recommended):**
+> **Python version:** PhotoSort currently targets Python 3.12 because several dependencies (e.g., MediaPipe) do not yet ship wheels for newer interpreters.
+
+3. **Create a Python 3.12 virtual environment (recommended):**
```bash
- python -m venv venv
+ python3.12 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
```
@@ -102,25 +104,15 @@ The application will automatically detect and load the model when you use the ro
### AI Best Shot Ranking & Engines
-PhotoSort can rank similar shots and assign AI ratings using either a local
-multi-model pipeline or an OpenAI-compatible vision model; switch engines in
+PhotoSort can rank similar shots and assign AI ratings using either the local
+MUSIQ/MANIQA/LIQE pipeline or an OpenAI-compatible vision model; switch engines in
**Preferences → AI Rating Engine** (`F10`). Settings persist between sessions.
**Local pipeline (default)**
-Runs entirely offline with three Hugging Face checkpoints:
-BlazeFace face detector (`qualcomm/MediaPipe-Face-Detection`), eye-state classifier
-(`MichalMlodawski/open-closed-eye-classification-mobilev2`), and the aesthetic predictor
-(`shunk031/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE`). Place each bundle
-under `models/` and choose **Local Pipeline** in preferences.
-
-Required downloads (install into `models/`):
-
-1. **Face detector** – [`qualcomm/MediaPipe-Face-Detection`](https://huggingface.co/qualcomm/MediaPipe-Face-Detection)
- Extract `model.onnx` to `models/job_*/model.onnx` (or e.g. `models/MediaPipe-Face-Detection_FaceDetector_float/model.onnx`).
-2. **Eye-state classifier** – [`MichalMlodawski/open-closed-eye-classification-mobilev2`](https://huggingface.co/MichalMlodawski/open-closed-eye-classification-mobilev2)
- Copy all files into `models/open-closed-eye-classification-mobilev2/`.
-3. **Aesthetic predictor** – [`shunk031/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE`](https://huggingface.co/shunk031/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE)
- Copy all files into `models/aesthetic_predictor/` (includes the CLIP backbone plus regression head).
+Runs entirely offline by blending three state-of-the-art no-reference IQA models:
+**MUSIQ**, **MANIQA**, and **LIQE**. These metrics are loaded
+through [`pyiqa`](https://github.com/chaofengc/IQA-PyTorch); no manual model
+downloads are required.
**LLM engine**
Connect PhotoSort to any OpenAI-compatible endpoint that accepts images
@@ -138,8 +130,7 @@ the API key blank.
- **AI star ratings**: To score every visible image, run **View → AI Rate Images**
(`Ctrl+A`). The ratings are stored in your XMP sidecars/metadata cache so
they survive reloads, and you can filter the library using the standard rating
- controls. (Detailed breakdowns from the AI response are kept internally for future
- UI integrations.)
+ controls.
### Exporting Logs
diff --git a/assets/keyboard-layout.html b/assets/keyboard-layout.html
index 83db7c7..dd6e363 100644
--- a/assets/keyboard-layout.html
+++ b/assets/keyboard-layout.html
@@ -785,7 +785,7 @@
Photosort Shortcuts (ctrl/cmd)
X
C
V
- BAnalyze Best Shots (Ctrl+B)
+ BAnalyze Best Shots
N
M
, <
@@ -946,7 +946,7 @@ Photosort Shortcuts (shift)
X
C
V
- BAnalyze Best Shots (Selected)
+ BAnalyze Best Shots (Images Selected)
NDecline Rotation Suggestions
M
, <
diff --git a/assets/keyboard-layout.png b/assets/keyboard-layout.png
index ae44df6..2877714 100644
Binary files a/assets/keyboard-layout.png and b/assets/keyboard-layout.png differ
diff --git a/requirements-cuda.txt b/requirements-cuda.txt
index 18a06d9..da5ebb4 100644
--- a/requirements-cuda.txt
+++ b/requirements-cuda.txt
@@ -11,4 +11,6 @@ opencv-python
pyexiv2
piexif
onnxruntime-gpu
-torchvision
\ No newline at end of file
+torchvision
+pyiqa
+mediapipe
diff --git a/requirements.txt b/requirements.txt
index 2c3d2c2..27d1ec3 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -14,3 +14,5 @@ pyexiv2
piexif
onnxruntime
torchvision
+pyiqa
+mediapipe
diff --git a/src/core/ai/__init__.py b/src/core/ai/__init__.py
index 94b58cf..f182638 100644
--- a/src/core/ai/__init__.py
+++ b/src/core/ai/__init__.py
@@ -1,9 +1,7 @@
-"""
-AI helper utilities for advanced ranking/scoring pipelines.
+"""AI helper utilities for best-shot ranking and scoring."""
-Currently exposes the experimental best-photo selector which chains together
-multiple pre-trained models (face detection, eye-state classification, and
-image quality scoring) to rank similar shots.
-"""
-
-from .best_photo_selector import BestPhotoSelector, BestShotResult # noqa: F401
+from .best_photo_selector import ( # noqa: F401
+ BestPhotoSelector,
+ BestShotResult,
+ MetricSpec,
+)
diff --git a/src/core/ai/best_photo_selector.py b/src/core/ai/best_photo_selector.py
index 22ff170..6f221f3 100644
--- a/src/core/ai/best_photo_selector.py
+++ b/src/core/ai/best_photo_selector.py
@@ -1,44 +1,32 @@
-"""
-Experimental multi-model pipeline that ranks similar shots by overall quality.
-
-Pipeline overview
------------------
-1. **Face detection** (qualcomm/MediaPipe-Face-Detection ONNX) is used to locate
- the primary subject plus the six BlazeFace keypoints.
-2. **Eye-state classification** (MichalMlodawski/open-closed-eye-classification-mobilev2)
- determines whether the subject's eyes are open.
-3. **Technical + aesthetic scoring** relies on the CLIP-based
- `shunk031/aesthetics-predictor` head. The predictor produces an aesthetic
- score and normalized CLIP embeddings for every image/crop, which are then
- used for framing analysis (cosine similarity between full image and face
- crops) plus the downstream composite ranking.
-
-Every metric is normalized to `[0, 1]` and combined via a simple weighting
-scheme, prioritising sharp, open-eye photos over purely aesthetic scores. The
-implementation is intentionally modular so that the UI or future automation
-can inject mocked detectors for tests or swap in custom weighting profiles.
-
-The bundled BlazeFace anchor tensor originates from MediaPipePyTorch
-(Apache License 2.0). The aesthetic head is based on the open-source model by
-shunk031 (Apache 2.0).
+"""Best-shot ranking powered by MUSIQ, MANIQA, and LIQE.
+
+This leans on modern no-reference IQA models provided by `pyiqa`.
+Each metric produces an independent
+quality estimate which we normalise and blend to obtain a composite score for
+every image in a similarity cluster.
"""
from __future__ import annotations
-import importlib.util
import logging
import os
-import sys
-import types
-from dataclasses import dataclass, field
-from typing import Callable, Dict, Iterable, List, Optional, Sequence, Tuple
+import threading
+from dataclasses import dataclass, field, replace
+from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple
+from urllib.parse import urlparse
-import cv2 # type: ignore
import numpy as np
from PIL import Image, ImageOps
-logger = logging.getLogger(__name__)
+from core.app_settings import get_local_best_shot_constants
+from core.image_processing.raw_image_processor import (
+ RawImageProcessor,
+ is_raw_extension,
+)
+from core.numpy_compat import ensure_numpy_sctypes
+
+logger = logging.getLogger(__name__)
PROJECT_ROOT = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..", "..")
@@ -47,503 +35,412 @@
"PHOTOSORT_MODELS_DIR", os.path.join(PROJECT_ROOT, "models")
)
-SUPPORTED_IMAGE_EXTENSIONS = {
- ".jpg",
- ".jpeg",
- ".png",
- ".bmp",
- ".tif",
- ".tiff",
- ".webp",
- ".heif",
- ".heic",
-}
-
-DEFAULT_COMPOSITE_WEIGHTS = {
- "eyes_open": 0.35,
- "technical": 0.25,
- "aesthetic": 0.25,
- "framing": 0.15,
-}
-
-# Anchor tensor copied from MediaPipePyTorch (Apache 2.0).
-ANCHOR_RESOURCE_PATH = os.path.join(
- os.path.dirname(__file__), "data", "blazeface_anchors.npy"
-)
+_PYIQA_DOWNLOAD_LOCK = threading.Lock()
+_METRIC_CACHE_LOCK = threading.Lock()
+_METRIC_CACHE: Dict[Tuple[str, str], Tuple[Any, threading.Lock]] = {}
+DEFAULT_EYE_OPEN_WEIGHT = 0.35
+
+if hasattr(Image, "Resampling"):
+ _RESAMPLE_LANCZOS = Image.Resampling.LANCZOS
+else: # pragma: no cover - Pillow < 10 fallback
+ _RESAMPLE_LANCZOS = Image.LANCZOS
+
+MANIQA_SAFE_INPUT = 224
+
+ensure_numpy_sctypes()
+
+
+class EyeStateAnalyzer:
+ """Estimates eye openness using MediaPipe Face Mesh landmarks."""
+
+ _LEFT_LANDMARKS = {
+ "upper": 159,
+ "lower": 145,
+ "outer": 33,
+ "inner": 133,
+ }
+ _RIGHT_LANDMARKS = {
+ "upper": 386,
+ "lower": 374,
+ "outer": 263,
+ "inner": 362,
+ }
+
+ def __init__(self, max_faces: int = 1) -> None:
+ import mediapipe as mp # type: ignore
+
+ self._mp_face_mesh = mp.solutions.face_mesh.FaceMesh(
+ static_image_mode=True,
+ max_num_faces=max_faces,
+ refine_landmarks=True,
+ min_detection_confidence=0.5,
+ min_tracking_confidence=0.5,
+ )
+ def predict_open_probability(self, image: Image.Image) -> Optional[float]:
+ arr = np.array(image.convert("RGB"))
+ if arr.ndim != 3 or arr.shape[2] != 3:
+ return None
+ # MediaPipe expects writeable flag False for performance
+ arr.flags.writeable = False
+ results = self._mp_face_mesh.process(arr)
+ if not results.multi_face_landmarks:
+ return None
+ height, width, _ = arr.shape
+ scores: List[float] = []
+ for face_landmarks in results.multi_face_landmarks[:1]:
+ ratio_left = self._compute_ratio(
+ face_landmarks.landmark, width, height, self._LEFT_LANDMARKS
+ )
+ ratio_right = self._compute_ratio(
+ face_landmarks.landmark, width, height, self._RIGHT_LANDMARKS
+ )
+ for ratio in (ratio_left, ratio_right):
+ if ratio is not None:
+ scores.append(self._ratio_to_probability(ratio))
+ if not scores:
+ return None
+ return float(sum(scores) / len(scores))
-def _first_existing_path(candidates: Iterable[str]) -> Optional[str]:
- for candidate in candidates:
- if candidate and os.path.exists(candidate):
- return candidate
- return None
+ @staticmethod
+ def _compute_ratio(
+ landmarks, width: int, height: int, indices: Dict[str, int]
+ ) -> Optional[float]:
+ max_index = max(indices.values(), default=-1)
+ if max_index >= 0 and hasattr(landmarks, "__len__"):
+ try:
+ if len(landmarks) <= max_index:
+ # Not enough landmarks to satisfy the requested indices.
+ return None
+ except TypeError:
+ return None
+ try:
+ upper = landmarks[indices["upper"]]
+ lower = landmarks[indices["lower"]]
+ outer = landmarks[indices["outer"]]
+ inner = landmarks[indices["inner"]]
+ except (IndexError, KeyError): # pragma: no cover - defensive guard
+ return None
+ vertical = abs(upper.y - lower.y)
+ horizontal = abs(outer.x - inner.x)
+ if horizontal <= 0:
+ return None
+ return vertical / horizontal
-def _clamp(value: float, min_value: float = 0.0, max_value: float = 1.0) -> float:
- return float(max(min_value, min(max_value, value)))
+ @staticmethod
+ def _ratio_to_probability(ratio: float) -> float:
+ closed_threshold = 0.18
+ open_threshold = 0.28
+ if ratio <= closed_threshold:
+ return 0.0
+ if ratio >= open_threshold:
+ return 1.0
+ span = open_threshold - closed_threshold
+ return (ratio - closed_threshold) / span if span > 0 else 0.0
-def _cosine_similarity(vec_a: np.ndarray, vec_b: np.ndarray) -> float:
- denom = float(np.linalg.norm(vec_a) * np.linalg.norm(vec_b))
- if denom == 0.0:
- return 0.0
- return float(np.dot(vec_a, vec_b) / denom)
+def _clamp(value: float, *, minimum: float = 0.0, maximum: float = 1.0) -> float:
+ return float(max(minimum, min(maximum, value)))
-def _default_focus_score(image: Image.Image) -> float:
- """Normalized Laplacian variance focus metric."""
- gray = image.convert("L")
- arr = np.array(gray, dtype=np.uint8)
- if arr.size == 0:
- return 0.0
- try:
- variance = cv2.Laplacian(arr, cv2.CV_64F).var()
- except cv2.error as exc:
- logger.warning("Laplacian focus metric failed: %s", exc)
+def _normalize(value: float, lower: float, upper: float) -> float:
+ if upper <= lower:
return 0.0
- return float(variance / (variance + 300.0))
+ return _clamp((value - lower) / (upper - lower))
-@dataclass
-class FaceDetectionResult:
- score: float
- bbox: Tuple[int, int, int, int] # (left, top, right, bottom) in pixels
- bbox_normalized: Tuple[float, float, float, float] # (ymin, xmin, ymax, xmax)
- keypoints: List[Tuple[float, float]] # normalized x/y pairs
- image_size: Tuple[int, int]
+_LOCAL_BEST_SHOT_SETTINGS = get_local_best_shot_constants()
- def crop_face(self, image: Image.Image) -> Image.Image:
- return image.crop(self.bbox).copy()
- def to_dict(self) -> Dict[str, object]:
- return {
- "score": self.score,
- "bbox": self.bbox,
- "bbox_normalized": self.bbox_normalized,
- "image_size": self.image_size,
- "keypoints": self.keypoints,
- }
+def _pil_to_tensor(image: Image.Image):
+ try:
+ import torch # type: ignore
+ except ImportError as exc: # pragma: no cover - torch is a hard dependency
+ raise RuntimeError(
+ "torch is required for IQA scoring. Install it via `pip install torch`."
+ ) from exc
+
+ cache_key = _LOCAL_BEST_SHOT_SETTINGS.tensor_cache_key
+ cache_dict = image.info if isinstance(getattr(image, "info", None), dict) else None
+ if cache_dict is not None:
+ cached = cache_dict.get(cache_key)
+ if cached is not None:
+ return cached
+
+ if image.mode != "RGB":
+ image = image.convert("RGB")
+ arr = np.asarray(image, dtype=np.float32)
+ if arr.ndim == 2: # grayscale image
+ arr = np.stack([arr, arr, arr], axis=-1)
+ if arr.shape[2] == 4: # RGBA → RGB
+ arr = arr[:, :, :3]
+ arr = _pad_to_model_stride(arr, _LOCAL_BEST_SHOT_SETTINGS.model_stride)
+ arr /= 255.0
+ tensor = torch.from_numpy(arr).permute(2, 0, 1).unsqueeze(0).contiguous()
+ if cache_dict is not None:
+ cache_dict[cache_key] = tensor
+ return tensor
+
+
+def _pad_to_model_stride(arr: np.ndarray, stride: int) -> np.ndarray:
+ if stride <= 1 or arr.ndim != 3:
+ return arr
+ height, width, _ = arr.shape
+ pad_h = (-height) % stride
+ pad_w = (-width) % stride
+ if pad_h == 0 and pad_w == 0:
+ return arr
+ return np.pad(arr, ((0, pad_h), (0, pad_w), (0, 0)), mode="edge")
+
+
+@dataclass(frozen=True)
+class MetricSpec:
+ name: str
+ weight: float = 1.0
+ min_score: float = 0.0
+ max_score: float = 100.0
+
+
+DEFAULT_METRIC_SPECS: Sequence[MetricSpec] = (
+ MetricSpec(name="musiq", weight=0.45, min_score=0.0, max_score=100.0),
+ MetricSpec(name="maniqa", weight=0.3, min_score=0.0, max_score=1.0),
+ MetricSpec(name="liqe", weight=0.25, min_score=0.0, max_score=100.0),
+)
-@dataclass
-class QualityScore:
- raw: float
- normalized: float
- embedding: Optional[np.ndarray] = None
+MetricScoreFn = Callable[[Image.Image], float]
@dataclass
-class BestShotResult:
- image_path: str
- composite_score: float
- metrics: Dict[str, float] = field(default_factory=dict)
- raw_metrics: Dict[str, float] = field(default_factory=dict)
- face: Optional[FaceDetectionResult] = None
-
- def to_dict(self) -> Dict[str, object]:
- payload: Dict[str, object] = {
- "image_path": self.image_path,
- "composite_score": self.composite_score,
- "metrics": self.metrics,
- "raw_metrics": self.raw_metrics,
+class IQAMetricRunner:
+ spec: MetricSpec
+ scorer: Optional[MetricScoreFn] = None
+ device_hint: Optional[str] = None
+ status_callback: Optional[Callable[[str], None]] = None
+
+ def evaluate(self, image: Image.Image) -> Optional[Dict[str, float]]:
+ scorer = self._ensure_scorer()
+ if scorer is None:
+ return None
+ raw = float(scorer(image))
+ normalized = _normalize(raw, self.spec.min_score, self.spec.max_score)
+ return {
+ "raw": raw,
+ "normalized": normalized,
}
- if self.face:
- payload["face"] = self.face.to_dict()
- return payload
-
-class BlazeFaceDetector:
- """Thin wrapper around the Qualcomm MediaPipe face detector (ONNX)."""
-
- def __init__(
- self,
- models_root: Optional[str] = None,
- model_path: Optional[str] = None,
- min_score: float = 0.6,
- iou_threshold: float = 0.3,
- max_faces: int = 5,
- ):
- self.models_root = models_root or DEFAULT_MODELS_ROOT
- self.model_path = model_path or _first_existing_path(
- [
- os.path.join(
- self.models_root, "job_jgzjewkop_optimized_onnx", "model.onnx"
- ),
- os.path.join(
- self.models_root,
- "MediaPipe-Face-Detection_FaceDetector_float",
- "model.onnx",
- ),
- ]
- )
- self.min_score = min_score
- self.iou_threshold = iou_threshold
- self.max_faces = max_faces
+ def _ensure_scorer(self) -> Optional[MetricScoreFn]:
+ if self.scorer is None:
+ self.scorer = self._build_pyiqa_scorer()
+ return self.scorer
- self._session = None
- self._input_name: Optional[str] = None
- self._output_names: Optional[List[str]] = None
- self._anchors: Optional[np.ndarray] = None
-
- def _ensure_ready(self):
- if self._session is not None:
- return
- if not self.model_path:
- raise FileNotFoundError(
- "Face detector ONNX model not found. Expected it under the 'models/' "
- "folder (e.g. job_*_onnx/model.onnx from qualcomm/MediaPipe-Face-Detection)."
- )
+ def _build_pyiqa_scorer(self) -> MetricScoreFn:
try:
- import onnxruntime as ort # type: ignore
- except ImportError as exc: # pragma: no cover - environment specific
- raise RuntimeError("onnxruntime is required for face detection") from exc
-
- providers = ["CPUExecutionProvider"]
- self._session = ort.InferenceSession(self.model_path, providers=providers)
- inputs = self._session.get_inputs()
- outputs = self._session.get_outputs()
- self._input_name = inputs[0].name
- self._output_names = [out.name for out in outputs]
-
- anchors_path = (
- os.path.join(self.models_root, "blazeface_anchors.npy")
- if os.path.exists(os.path.join(self.models_root, "blazeface_anchors.npy"))
- else ANCHOR_RESOURCE_PATH
- )
- if not os.path.exists(anchors_path):
- raise FileNotFoundError(
- "BlazeFace anchors file missing. Expected either "
- f"{anchors_path} or models/blazeface_anchors.npy."
- )
- self._anchors = np.load(anchors_path).astype(np.float32)
-
- def detect_faces(
- self,
- image: Image.Image,
- image_path: Optional[str] = None,
- max_faces: Optional[int] = None,
- ) -> List[FaceDetectionResult]:
- self._ensure_ready()
- assert self._session is not None
- assert self._input_name is not None
- assert self._output_names is not None
- assert self._anchors is not None
-
- width, height = image.size
- np_img = np.asarray(image.convert("RGB"), dtype=np.float32) / 255.0
- resized = cv2.resize(np_img, (256, 256), interpolation=cv2.INTER_AREA)
- tensor = np.transpose(resized, (2, 0, 1))[None, ...]
-
- outputs = self._session.run(self._output_names, {self._input_name: tensor})
- box_coords = np.concatenate(outputs[:2], axis=1)[0]
- box_scores = np.concatenate(outputs[2:], axis=1)[0, :, 0]
- box_scores = 1.0 / (1.0 + np.exp(-box_scores))
-
- decoded = self._decode_boxes(box_coords, self._anchors)
- mask = box_scores >= self.min_score
- decoded = decoded[mask]
- scores = box_scores[mask]
-
- if decoded.size == 0:
- return []
+ import torch # type: ignore
+ import pyiqa # type: ignore
+ import pyiqa.utils.download_util as download_util # type: ignore
+ except ImportError as exc: # pragma: no cover - import guarded for tests
+ raise RuntimeError(
+ "pyiqa is required for the MUSIQ/MANIQA/LIQE pipeline."
+ " Install it with `pip install pyiqa`."
+ ) from exc
- keep_indices = self._weighted_nms(decoded[:, :4], scores, max_faces)
- results: List[FaceDetectionResult] = []
- for idx in keep_indices:
- box = decoded[idx, :4]
- keypoints = decoded[idx, 4:].reshape(-1, 2).tolist()
- ymin, xmin, ymax, xmax = [float(_clamp(v)) for v in box]
- left = int(round(xmin * width))
- top = int(round(ymin * height))
- right = int(round(xmax * width))
- bottom = int(round(ymax * height))
- if right <= left or bottom <= top:
- continue
- results.append(
- FaceDetectionResult(
- score=float(scores[idx]),
- bbox=(left, top, right, bottom),
- bbox_normalized=(ymin, xmin, ymax, xmax),
- keypoints=keypoints,
- image_size=(width, height),
- )
- )
- return results
+ if self.device_hint is not None:
+ device = torch.device(self.device_hint)
+ else:
+ device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- @staticmethod
- def _decode_boxes(raw_boxes: np.ndarray, anchors: np.ndarray) -> np.ndarray:
- x_scale = 128.0
- y_scale = 128.0
- h_scale = 128.0
- w_scale = 128.0
-
- boxes = np.zeros_like(raw_boxes)
- x_center = raw_boxes[:, 0] / x_scale * anchors[:, 2] + anchors[:, 0]
- y_center = raw_boxes[:, 1] / y_scale * anchors[:, 3] + anchors[:, 1]
- w = raw_boxes[:, 2] / w_scale * anchors[:, 2]
- h = raw_boxes[:, 3] / h_scale * anchors[:, 3]
-
- boxes[:, 0] = y_center - h / 2.0
- boxes[:, 1] = x_center - w / 2.0
- boxes[:, 2] = y_center + h / 2.0
- boxes[:, 3] = x_center + w / 2.0
-
- for k in range(6):
- offset = 4 + k * 2
- boxes[:, offset] = (
- raw_boxes[:, offset] / x_scale * anchors[:, 2] + anchors[:, 0]
- )
- boxes[:, offset + 1] = (
- raw_boxes[:, offset + 1] / y_scale * anchors[:, 3] + anchors[:, 1]
- )
+ cache_key = (self.spec.name, str(device))
- return boxes
+ with _METRIC_CACHE_LOCK:
+ cached = _METRIC_CACHE.get(cache_key)
+ if cached is None:
- def _weighted_nms(
- self, boxes: np.ndarray, scores: np.ndarray, max_faces: Optional[int]
- ) -> List[int]:
- order = scores.argsort()[::-1]
- keep: List[int] = []
- max_candidates = max_faces or self.max_faces
+ def _factory():
+ return pyiqa.create_metric(
+ self.spec.name,
+ device=device,
+ as_loss=False,
+ )
- while order.size > 0 and len(keep) < max_candidates:
- idx = order[0]
- keep.append(int(idx))
- if order.size == 1:
- break
- ious = self._iou(boxes[idx], boxes[order[1:]])
- order = order[1:][ious < self.iou_threshold]
- return keep
+ if self.status_callback is None:
+ metric = _factory()
+ else:
+ metric = self._with_download_notifications(download_util, _factory)
+ metric.eval()
+ cached = (metric, threading.Lock())
+ _METRIC_CACHE[cache_key] = cached
- @staticmethod
- def _iou(box: np.ndarray, others: np.ndarray) -> np.ndarray:
- ymin = np.maximum(box[0], others[:, 0])
- xmin = np.maximum(box[1], others[:, 1])
- ymax = np.minimum(box[2], others[:, 2])
- xmax = np.minimum(box[3], others[:, 3])
+ metric, metric_lock = cached
+ torch_module = torch
- inter = np.maximum(0.0, ymax - ymin) * np.maximum(0.0, xmax - xmin)
- box_area = (box[2] - box[0]) * (box[3] - box[1])
- other_area = (others[:, 2] - others[:, 0]) * (others[:, 3] - others[:, 1])
- union = box_area + other_area - inter + 1e-6
- return inter / union
+ def _run_metric(input_tensor):
+ with torch_module.no_grad():
+ output = metric(input_tensor)
+ return float(output.item()) if hasattr(output, "item") else float(output)
+ def _tensor_on_device(source: Image.Image):
+ return _pil_to_tensor(source).to(device)
-class EyeStateClassifier:
- """Wrapper around the MobilenetV2 eye open/closed classifier."""
+ def _score(image: Image.Image) -> float:
+ tensor = _tensor_on_device(image)
+ with metric_lock:
+ try:
+ value = _run_metric(tensor)
+ except Exception as exc:
+ # MANIQA is known to raise a "list index out of range" error on some inputs; fallback by recropping.
+ if (
+ self.spec.name != "maniqa"
+ or "list index out of range" not in str(exc).lower()
+ ):
+ raise
+ logger.debug(
+ "MANIQA failed on %s; retrying with %dx%d center crop",
+ image.info.get("source_path", ""),
+ MANIQA_SAFE_INPUT,
+ MANIQA_SAFE_INPUT,
+ )
+ safe_image = ImageOps.fit(
+ image,
+ (MANIQA_SAFE_INPUT, MANIQA_SAFE_INPUT),
+ method=_RESAMPLE_LANCZOS,
+ centering=(0.5, 0.5),
+ )
+ tensor = _tensor_on_device(safe_image)
+ value = _run_metric(tensor)
+ return value
+
+ return _score
+
+ def _with_download_notifications(self, download_util, factory):
+ original_loader = download_util.load_file_from_url
+
+ def wrapped_loader(url, model_dir=None, progress=True, file_name=None):
+ target_dir = model_dir or download_util.DEFAULT_CACHE_DIR
+ filename = file_name or os.path.basename(urlparse(url).path)
+ destination = os.path.abspath(os.path.join(target_dir, filename))
+ should_notify = not os.path.exists(destination)
+ if should_notify:
+ self._report_download_status("start", destination)
+ try:
+ return original_loader(
+ url,
+ model_dir=model_dir,
+ progress=progress,
+ file_name=file_name,
+ )
+ finally:
+ if should_notify:
+ self._report_download_status("done", destination)
- def __init__(self, model_dir: Optional[str] = None):
- self.model_dir = model_dir or os.path.join(
- DEFAULT_MODELS_ROOT, "open-closed-eye-classification-mobilev2"
- )
- if not os.path.isdir(self.model_dir):
- raise FileNotFoundError(
- "Eye classifier checkpoint not found. "
- "Download MichalMlodawski/open-closed-eye-classification-mobilev2 "
- "into the 'models/open-closed-eye-classification-mobilev2' folder."
- )
- self._device = None
- self._processor = None
- self._model = None
- self._ensure_ready()
+ with _PYIQA_DOWNLOAD_LOCK:
+ download_util.load_file_from_url = wrapped_loader
+ try:
+ return factory()
+ finally:
+ download_util.load_file_from_url = original_loader
- def _ensure_ready(self):
- if self._model is not None:
+ def _report_download_status(self, stage: str, destination: str) -> None:
+ if not self.status_callback:
return
- try:
- import torch # type: ignore
- from transformers import ( # type: ignore
- AutoImageProcessor,
- MobileNetV2ForImageClassification,
+ friendly_metric = self.spec.name.upper()
+ target = os.path.expanduser(destination)
+ if stage == "start":
+ message = (
+ f"Downloading {friendly_metric} weights to {target}. "
+ "Progress also appears in the log window."
)
- except ImportError as exc: # pragma: no cover
- raise RuntimeError(
- "transformers and torch are required for the eye-state classifier"
- ) from exc
-
- self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- self._processor = AutoImageProcessor.from_pretrained(
- self.model_dir, local_files_only=True
- )
- self._model = MobileNetV2ForImageClassification.from_pretrained(
- self.model_dir, local_files_only=True
- )
- self._model.to(self._device)
- self._model.eval()
-
- def predict_open_probability(
- self, eye_image: Image.Image, image_path: Optional[str] = None
- ) -> float:
- import torch # type: ignore
-
- assert (
- self._processor is not None
- and self._model is not None
- and self._device is not None
- )
- inputs = self._processor(images=eye_image, return_tensors="pt")
- inputs = {k: v.to(self._device) for k, v in inputs.items()}
- with torch.no_grad():
- logits = self._model(**inputs).logits
- probs = torch.softmax(logits, dim=-1)
- # Class index 1 == eyes open
- return float(probs[0, 1].item())
-
-
-class QualityFusionModel:
- """Wraps the local AestheticsPredictor V2 model for scoring + embeddings."""
-
- def __init__(
- self,
- models_root: Optional[str] = None,
- predictor_dir: Optional[str] = None,
- ):
- models_root = models_root or DEFAULT_MODELS_ROOT
- self.predictor_dir = predictor_dir or os.path.join(
- models_root, "aesthetic_predictor"
- )
- if not os.path.isdir(self.predictor_dir):
- raise FileNotFoundError(
- "Aesthetic predictor not found. "
- "Download shunk031/aesthetics-predictor-v2 (linear) into "
- "models/aesthetic_predictor."
- )
- self._package_name = (
- f"photosort_aesthetic_predictor_{abs(hash(self.predictor_dir))}"
- )
- if self._package_name not in sys.modules:
- package = types.ModuleType(self._package_name)
- package.__path__ = [self.predictor_dir]
- sys.modules[self._package_name] = package
- self._device = None
- self._processor = None
- self._model = None
- self._load_predictor()
-
- def _load_local_module(self, module_name: str, file_path: str):
- spec = importlib.util.spec_from_file_location(module_name, file_path)
- if spec is None or spec.loader is None:
- raise ImportError(f"Could not load module from {file_path}")
- module = importlib.util.module_from_spec(spec)
- sys.modules[module_name] = module
- spec.loader.exec_module(module)
- return module
-
- def _load_predictor(self):
- import torch # type: ignore
- from safetensors.torch import load_file # type: ignore
- from transformers import CLIPImageProcessor # type: ignore
-
- package_prefix = self._package_name
- config_module = self._load_local_module(
- f"{package_prefix}.configuration_predictor",
- os.path.join(self.predictor_dir, "configuration_predictor.py"),
- )
- model_module = self._load_local_module(
- f"{package_prefix}.modeling_v2",
- os.path.join(self.predictor_dir, "modeling_v2.py"),
- )
- AestheticsPredictorConfig = getattr(config_module, "AestheticsPredictorConfig")
- PredictorModel = getattr(model_module, "AestheticsPredictorV2Linear")
-
- config = AestheticsPredictorConfig.from_pretrained(self.predictor_dir)
- model = PredictorModel(config)
- state_dict = load_file(os.path.join(self.predictor_dir, "model.safetensors"))
- model.load_state_dict(state_dict, strict=False)
-
- self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
- self._model = model.to(self._device)
- self._model.eval()
- self._processor = CLIPImageProcessor.from_pretrained(
- self.predictor_dir, local_files_only=True
- )
+ else:
+ message = f"{friendly_metric} weights cached at {target}."
+ self.status_callback(message)
- def score(self, image: Image.Image, return_embedding: bool = False) -> QualityScore:
- import torch # type: ignore
- assert self._model is not None
- assert self._processor is not None
- assert self._device is not None
+@dataclass
+class BestShotResult:
+ image_path: str
+ composite_score: float
+ metrics: Dict[str, float] = field(default_factory=dict)
+ raw_metrics: Dict[str, float] = field(default_factory=dict)
- inputs = self._processor(images=image, return_tensors="pt")
- pixel_values = inputs["pixel_values"].to(self._device)
- with torch.no_grad():
- outputs = self._model(
- pixel_values=pixel_values,
- return_dict=True,
- )
- logits = outputs.logits
- embedding_tensor = outputs.hidden_states
-
- raw = float(logits.squeeze().item())
- normalized = _clamp((raw - 1.0) / 9.0)
- embedding_np = (
- embedding_tensor.squeeze().detach().cpu().numpy()
- if return_embedding
- else None
- )
- return QualityScore(raw=raw, normalized=normalized, embedding=embedding_np)
+ def to_dict(self) -> Dict[str, object]:
+ return {
+ "image_path": self.image_path,
+ "composite_score": self.composite_score,
+ "metrics": self.metrics,
+ "raw_metrics": self.raw_metrics,
+ }
class BestPhotoSelector:
- """High-level orchestrator that ranks images by composite quality."""
+ """Ranks images by blending multiple no-reference IQA scores."""
def __init__(
self,
- face_detector: Optional[BlazeFaceDetector] = None,
- eye_classifier: Optional[EyeStateClassifier] = None,
- quality_model: Optional[QualityFusionModel] = None,
+ face_detector=None, # Legacy arguments kept for backwards compatibility
+ eye_classifier=None,
+ quality_model=None,
models_root: Optional[str] = None,
weights: Optional[Dict[str, float]] = None,
image_loader: Optional[Callable[[str], Image.Image]] = None,
- focus_metric_fn: Optional[Callable[[Image.Image], float]] = None,
+ focus_metric_fn=None,
+ metric_specs: Optional[Sequence[MetricSpec]] = None,
+ metric_factories: Optional[Dict[str, MetricScoreFn]] = None,
+ device: Optional[str] = None,
+ status_callback: Optional[Callable[[str], None]] = None,
+ eye_state_analyzer: Optional[EyeStateAnalyzer] = None,
+ enable_eye_detection: bool = True,
):
- self.models_root = models_root or DEFAULT_MODELS_ROOT
- self.face_detector = face_detector
- self.eye_classifier = eye_classifier
- self.quality_model = quality_model
- self.weights = weights or DEFAULT_COMPOSITE_WEIGHTS
- self._image_loader = image_loader or self._default_loader
- self._focus_metric = focus_metric_fn or _default_focus_score
+ if any(
+ arg is not None
+ for arg in (face_detector, eye_classifier, quality_model, focus_metric_fn)
+ ):
+ logger.debug(
+ "Legacy detectors/classifiers are no longer used by the IQA pipeline."
+ )
- if self.face_detector is None:
- try:
- self.face_detector = BlazeFaceDetector(models_root=self.models_root)
- except FileNotFoundError as exc:
- logger.warning("Face detector disabled: %s", exc)
- if self.eye_classifier is None:
- try:
- self.eye_classifier = EyeStateClassifier(
- os.path.join(
- self.models_root, "open-closed-eye-classification-mobilev2"
- )
- )
- except FileNotFoundError as exc:
- logger.warning("Eye-state classifier disabled: %s", exc)
- except RuntimeError as exc:
- logger.warning(
- "Eye-state classifier disabled (install transformers>=4.30 and torch>=2.1 to enable eye scoring): %s",
- exc,
- )
- self.eye_classifier = None
- except Exception as exc: # pragma: no cover - defensive guard
- logger.warning(
- "Eye-state classifier initialisation failed: %s", exc, exc_info=True
- )
- self.eye_classifier = None
- if self.quality_model is None:
- try:
- self.quality_model = QualityFusionModel(models_root=self.models_root)
- except FileNotFoundError as exc:
- logger.error("Quality model unavailable: %s", exc)
- raise
+ self.models_root = models_root
+ self._image_loader = image_loader or self._default_loader
+ self._status_callback = status_callback
+
+ base_specs = metric_specs or DEFAULT_METRIC_SPECS
+ if not base_specs:
+ raise ValueError("At least one metric specification is required")
+
+ self._metric_runners: List[IQAMetricRunner] = []
+ self._metric_weights: Dict[str, float] = {}
+ factories = metric_factories or {}
+ for spec in base_specs:
+ adjusted_spec = (
+ replace(spec, weight=weights.get(spec.name, spec.weight))
+ if weights and spec.name in weights
+ else spec
+ )
+ runner = IQAMetricRunner(
+ spec=adjusted_spec,
+ scorer=factories.get(spec.name),
+ device_hint=device,
+ status_callback=self._status_callback,
+ )
+ self._metric_runners.append(runner)
+ self._metric_weights[adjusted_spec.name] = adjusted_spec.weight
+
+ self._eye_analyzer: Optional[EyeStateAnalyzer] = None
+ desired_eye_weight = (
+ weights.get("eyes_open", DEFAULT_EYE_OPEN_WEIGHT)
+ if weights and "eyes_open" in weights
+ else DEFAULT_EYE_OPEN_WEIGHT
+ )
+ if enable_eye_detection and desired_eye_weight > 0:
+ self._eye_analyzer = eye_state_analyzer or EyeStateAnalyzer()
+ self._metric_weights["eyes_open"] = desired_eye_weight
def rank_directory(
self, directory: str, recursive: bool = False
) -> List[BestShotResult]:
image_paths: List[str] = []
if recursive:
- for root, _, files in os.walk(directory):
+ for root, _, files in os.walk(directory): # pragma: no cover - convenience
for filename in files:
if self._is_supported_file(filename):
image_paths.append(os.path.join(root, filename))
@@ -556,177 +453,212 @@ def rank_directory(
def rank_images(self, image_paths: Sequence[str]) -> List[BestShotResult]:
results: List[BestShotResult] = []
for path in image_paths:
- result = self._analyze_image(path)
+ result = self.analyze_image(path)
if result:
results.append(result)
return sorted(results, key=lambda r: r.composite_score, reverse=True)
+ def analyze_image(self, image_path: str) -> Optional[BestShotResult]:
+ return self._analyze_image(image_path)
+
def _is_supported_file(self, filename: str) -> bool:
_, ext = os.path.splitext(filename)
- return ext.lower() in SUPPORTED_IMAGE_EXTENSIONS
+ return ext.lower() in {
+ ".jpg",
+ ".jpeg",
+ ".png",
+ ".bmp",
+ ".tif",
+ ".tiff",
+ ".webp",
+ ".heif",
+ ".heic",
+ }
def _default_loader(self, image_path: str) -> Image.Image:
with Image.open(image_path) as img:
prepared = ImageOps.exif_transpose(img).convert("RGB")
prepared.info["source_path"] = image_path
- prepared.info["region"] = "full"
return prepared.copy()
def _analyze_image(self, image_path: str) -> Optional[BestShotResult]:
try:
image = self._image_loader(image_path)
image.info.setdefault("source_path", image_path)
- image.info.setdefault("region", "full")
+ logger.info("Analyzing image through BestPhotoSelector: %s", image_path)
except Exception as exc:
logger.error("Failed to load %s: %s", image_path, exc)
return None
- assert self.quality_model is not None
- try:
- full_quality = self.quality_model.score(image, return_embedding=True)
- except Exception as exc:
- logger.error("Quality scoring failed for %s: %s", image_path, exc)
- image.close()
- return None
-
- metrics: Dict[str, float] = {"aesthetic": full_quality.normalized}
- raw_metrics: Dict[str, float] = {"quality_full_raw": full_quality.raw}
- focus_full = self._focus_metric(image)
- raw_metrics["focus_full"] = focus_full
-
- face_result: Optional[FaceDetectionResult] = None
- technical_score = focus_full
- framing_score: Optional[float] = None
- if self.face_detector:
+ metrics: Dict[str, float] = {}
+ raw_metrics: Dict[str, float] = {}
+ for runner in self._metric_runners:
try:
- detections = self.face_detector.detect_faces(
- image, image_path=image_path
+ payload = runner.evaluate(image)
+ except Exception as exc: # pragma: no cover - defensive logging
+ logger.warning(
+ "Metric %s failed for %s: %s",
+ runner.spec.name,
+ image_path,
+ exc,
)
- except Exception as exc:
- logger.warning("Face detection failed for %s: %s", image_path, exc)
- detections = []
- if detections:
- face_result = detections[0]
- face_crop = face_result.crop_face(image)
- face_crop.info["source_path"] = image_path
- face_crop.info["region"] = "face"
-
- focus_face = self._focus_metric(face_crop)
- raw_metrics["focus_face"] = focus_face
+ continue
+ if not payload:
+ continue
+ metrics[runner.spec.name] = payload["normalized"]
+ raw_metrics[f"{runner.spec.name}_raw"] = payload["raw"]
+
+ eye_prob = self._compute_eye_openness(image)
+ if eye_prob is not None:
+ metrics["eyes_open"] = eye_prob
+ raw_metrics["eyes_open_probability"] = eye_prob
+ logger.info(
+ "Eye openness for %s: %.3f",
+ os.path.basename(image_path),
+ eye_prob,
+ )
+ elif self._eye_analyzer is not None:
+ logger.debug(
+ "Eye analyzer could not determine probability for %s",
+ os.path.basename(image_path),
+ )
- try:
- face_quality = self.quality_model.score(
- face_crop, return_embedding=True
- )
- raw_metrics["quality_face_raw"] = face_quality.raw
- technical_score = 0.6 * focus_face + 0.4 * face_quality.normalized
- if (
- full_quality.embedding is not None
- and face_quality.embedding is not None
- ):
- framing_score = _clamp(
- (
- _cosine_similarity(
- full_quality.embedding, face_quality.embedding
- )
- + 1.0
- )
- / 2.0
- )
- except Exception as exc:
- logger.warning(
- "Subject quality scoring failed for %s: %s", image_path, exc
- )
- finally:
- face_crop.close()
-
- if self.eye_classifier:
- eye_crop = self._extract_eye_region(image, face_result)
- if eye_crop is not None:
- eye_crop.info["source_path"] = image_path
- eye_crop.info["region"] = "eyes"
- try:
- eyes_open_prob = (
- self.eye_classifier.predict_open_probability(
- eye_crop, image_path=image_path
- )
- )
- metrics["eyes_open"] = eyes_open_prob
- raw_metrics["eyes_open_probability"] = eyes_open_prob
- except Exception as exc:
- logger.warning(
- "Eye-state classification failed for %s: %s",
- image_path,
- exc,
- )
- finally:
- eye_crop.close()
-
- metrics["technical"] = _clamp(technical_score)
- if framing_score is not None:
- metrics["framing"] = framing_score
+ image.close()
+
+ if not metrics:
+ logger.error("All IQA metrics failed for %s", image_path)
+ return None
composite = self._combine_scores(metrics)
- result = BestShotResult(
+ return BestShotResult(
image_path=image_path,
composite_score=composite,
metrics=metrics,
raw_metrics=raw_metrics,
- face=face_result,
)
- image.close()
- return result
- def _combine_scores(self, metrics: Dict[str, float]) -> float:
+ def _compute_eye_openness(self, image: Image.Image) -> Optional[float]:
+ if self._eye_analyzer is None:
+ return None
+
+ def _predict(candidate: Image.Image) -> Optional[float]:
+ try:
+ value = self._eye_analyzer.predict_open_probability(candidate)
+ if value is None:
+ return None
+ return max(0.0, min(1.0, float(value)))
+ except Exception as exc: # pragma: no cover - defensive logging
+ logger.warning(
+ "Eye-state analysis failed for %s: %s",
+ candidate.info.get("source_path", ""),
+ exc,
+ )
+ return None
+
+ source_path = image.info.get("source_path")
+ disposable: List[Image.Image] = []
+ candidates: List[Image.Image] = [image]
+
+ def _append_candidate(candidate: Optional[Image.Image]) -> None:
+ if candidate is None:
+ return
+ candidates.append(candidate)
+ disposable.append(candidate)
+
+ # Center crops of the working preview sometimes help MediaPipe to focus on faces
+ for crop in self._build_eye_crops(image):
+ _append_candidate(crop)
+
+ if source_path:
+ fallback = self._load_eye_image(source_path)
+ _append_candidate(fallback)
+ if fallback is not None:
+ for crop in self._build_eye_crops(fallback):
+ _append_candidate(crop)
+
+ try:
+ for candidate in candidates:
+ result = _predict(candidate)
+ if result is not None:
+ return result
+ finally:
+ for extra in disposable:
+ try:
+ extra.close()
+ except Exception as exc:
+ logger.debug(
+ "Failed to close disposable eye candidate: %s",
+ exc,
+ exc_info=True,
+ )
+
+ return None
+
+ def _load_eye_image(self, source_path: str) -> Optional[Image.Image]:
+ normalized_path = os.path.normpath(source_path)
+ target_edge = _LOCAL_BEST_SHOT_SETTINGS.eye_fallback_max_edge
+ resolution = (target_edge, target_edge)
+ if is_raw_extension(os.path.splitext(normalized_path)[1].lower()):
+ preview = RawImageProcessor.process_raw_for_preview(
+ normalized_path,
+ apply_auto_edits=False,
+ preview_max_resolution=resolution,
+ )
+ if preview:
+ preview = ImageOps.exif_transpose(preview).convert("RGB")
+ preview.info.setdefault("source_path", source_path)
+ return preview
+ try:
+ with Image.open(source_path) as raw:
+ prepared = ImageOps.exif_transpose(raw).convert("RGB")
+ prepared.thumbnail(resolution, _RESAMPLE_LANCZOS)
+ buffered = prepared.copy()
+ buffered.info.setdefault("source_path", source_path)
+ return buffered
+ except Exception:
+ logger.debug(
+ "Fallback eye preview load failed for %s", source_path, exc_info=True
+ )
+ return None
+
+ def _build_eye_crops(self, image: Image.Image) -> List[Image.Image]:
+ width, height = image.size
+ if width < 80 or height < 80:
+ return []
+ scales = (0.85, 0.7)
+ vertical_bias = (0.35, 0.25)
+ crops: List[Image.Image] = []
+ source_path = image.info.get("source_path")
+ for scale, bias in zip(scales, vertical_bias):
+ crop_w = max(64, int(width * scale))
+ crop_h = max(64, int(height * scale))
+ left = max(0, (width - crop_w) // 2)
+ top = max(0, int((height - crop_h) * bias))
+ right = min(width, left + crop_w)
+ bottom = min(height, top + crop_h)
+ if right - left < 64 or bottom - top < 64:
+ continue
+ crop = image.crop((left, top, right, bottom)).copy()
+ max_edge = _LOCAL_BEST_SHOT_SETTINGS.eye_fallback_max_edge
+ if max(crop.size) > max_edge:
+ crop.thumbnail((max_edge, max_edge), _RESAMPLE_LANCZOS)
+ crop.info.setdefault("source_path", source_path)
+ crops.append(crop)
+ return crops
+
+ def _combine_scores(self, normalized_metrics: Dict[str, float]) -> float:
numerator = 0.0
denom = 0.0
- for key, weight in self.weights.items():
- if key in metrics:
- numerator += metrics[key] * weight
- denom += weight
+ for name, value in normalized_metrics.items():
+ weight = self._metric_weights.get(name, 1.0)
+ numerator += value * weight
+ denom += weight
return numerator / denom if denom else 0.0
- def _extract_eye_region(
- self,
- image: Image.Image,
- detection: FaceDetectionResult,
- padding_ratio: float = 0.35,
- ) -> Optional[Image.Image]:
- if len(detection.keypoints) < 2:
- return None
- width, height = detection.image_size
- right_eye = detection.keypoints[0]
- left_eye = detection.keypoints[1]
-
- xs = [right_eye[0], left_eye[0]]
- ys = [right_eye[1], left_eye[1]]
- x_min = min(xs)
- x_max = max(xs)
- if x_max <= x_min:
- return None
- eye_width = x_max - x_min
- pad_x = eye_width * padding_ratio
- pad_y = eye_width * (padding_ratio + 0.1)
- center_y = sum(ys) / len(ys)
-
- x0 = _clamp(x_min - pad_x)
- x1 = _clamp(x_max + pad_x)
- y0 = _clamp(center_y - pad_y)
- y1 = _clamp(center_y + pad_y)
-
- left = int(round(x0 * width))
- right = int(round(x1 * width))
- top = int(round(y0 * height))
- bottom = int(round(y1 * height))
-
- if right <= left or bottom <= top:
- return None
- return image.crop((left, top, right, bottom)).copy()
-
__all__ = [
"BestPhotoSelector",
"BestShotResult",
- "FaceDetectionResult",
- "QualityScore",
+ "MetricSpec",
+ "DEFAULT_MODELS_ROOT",
]
diff --git a/src/core/ai/best_shot_pipeline.py b/src/core/ai/best_shot_pipeline.py
index daee61e..8e22d32 100644
--- a/src/core/ai/best_shot_pipeline.py
+++ b/src/core/ai/best_shot_pipeline.py
@@ -4,19 +4,38 @@
import io
import json
import logging
+import math
import os
import re
import threading
+from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from enum import Enum
-from typing import Any, Dict, List, Optional, Sequence, Tuple, Set
-
-from PIL import Image, ImageDraw, ImageFont
-
-from core.ai.best_photo_selector import BestPhotoSelector
+from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Set
+
+import numpy as np
+from PIL import Image, ImageDraw, ImageFont, ImageOps, ImageStat
+
+try: # pragma: no cover - OpenCV optional during tests
+ import cv2
+except Exception: # pragma: no cover - gracefully degrade if OpenCV missing
+ cv2 = None
+
+from core.ai.best_photo_selector import (
+ BestPhotoSelector,
+ BestShotResult,
+ DEFAULT_METRIC_SPECS,
+ EyeStateAnalyzer,
+ MetricSpec,
+)
from core.app_settings import (
+ PerformanceMode,
+ get_custom_thread_count,
get_best_shot_engine,
get_openai_config,
+ get_performance_mode,
+ get_preferred_torch_device,
+ calculate_max_workers,
DEFAULT_OPENAI_API_KEY,
DEFAULT_OPENAI_MODEL,
DEFAULT_OPENAI_BASE_URL,
@@ -62,6 +81,26 @@ class BestShotEngine(str, Enum):
"Provide one concise sentence noting the dominant strengths and the limiting flaw(s)."
)
+MAX_LOCAL_ANALYSIS_EDGE = 1024
+RESPONSIVE_LOCAL_ANALYSIS_EDGE = 640
+PERFORMANCE_RATIO_THRESHOLD = 0.95
+PREFILTER_PREVIEW_MAX_EDGE = 512
+PREFILTER_MAX_CANDIDATES = 3
+PREFILTER_MIN_CLUSTER_SIZE = 4
+HEURISTIC_SHARPNESS_NORMALIZER = 250.0
+HEURISTIC_CONTRAST_NORMALIZER = 75.0
+RATING_THRESHOLDS = (0.16, 0.34, 0.44, 0.64)
+QUALITY_NORMALIZATION_RANGES = {
+ "musiq_raw": (25.0, 85.0),
+ "liqe_raw": (30.0, 90.0),
+ "maniqa_raw": (0.25, 0.85),
+}
+
+if hasattr(Image, "Resampling"):
+ _RESAMPLE_BEST = Image.Resampling.LANCZOS
+else: # pragma: no cover - Pillow < 10
+ _RESAMPLE_BEST = Image.LANCZOS
+
@dataclass
class LLMConfig:
@@ -81,6 +120,242 @@ def __post_init__(self) -> None:
self.rating_prompt = DEFAULT_RATING_PROMPT
+@dataclass(frozen=True)
+class LocalAnalysisProfile:
+ name: str
+ max_edge: int
+ metric_specs: Sequence[MetricSpec]
+
+
+def _metric_specs_for(names: Sequence[str]) -> Tuple[MetricSpec, ...]:
+ enabled = {name.lower() for name in names}
+ filtered = tuple(
+ spec for spec in DEFAULT_METRIC_SPECS if spec.name.lower() in enabled
+ )
+ return filtered or tuple(DEFAULT_METRIC_SPECS)
+
+
+_PERFORMANCE_ANALYSIS_PROFILE = LocalAnalysisProfile(
+ name="performance",
+ max_edge=MAX_LOCAL_ANALYSIS_EDGE,
+ metric_specs=tuple(DEFAULT_METRIC_SPECS),
+)
+
+_RESPONSIVE_ANALYSIS_PROFILE = LocalAnalysisProfile(
+ name="responsive",
+ max_edge=RESPONSIVE_LOCAL_ANALYSIS_EDGE,
+ metric_specs=_metric_specs_for(("musiq", "maniqa")),
+)
+
+
+def _calculate_custom_thread_ratio() -> Optional[float]:
+ cpu_count = os.cpu_count() or 0
+ if cpu_count <= 0:
+ return None
+ try:
+ custom_threads = get_custom_thread_count()
+ except Exception:
+ return None
+ clamped = max(1, min(cpu_count, int(custom_threads)))
+ return clamped / float(cpu_count)
+
+
+def select_local_analysis_profile(
+ mode: PerformanceMode,
+ *,
+ custom_thread_ratio: Optional[float] = None,
+) -> LocalAnalysisProfile:
+ if mode in (PerformanceMode.PERFORMANCE, PerformanceMode.BALANCED):
+ return _PERFORMANCE_ANALYSIS_PROFILE
+ if mode == PerformanceMode.CUSTOM and custom_thread_ratio is not None:
+ if custom_thread_ratio >= PERFORMANCE_RATIO_THRESHOLD:
+ return _PERFORMANCE_ANALYSIS_PROFILE
+ return _RESPONSIVE_ANALYSIS_PROFILE
+
+
+def _determine_local_analysis_profile() -> LocalAnalysisProfile:
+ mode = get_performance_mode()
+ ratio = _calculate_custom_thread_ratio() if mode == PerformanceMode.CUSTOM else None
+ profile = select_local_analysis_profile(mode, custom_thread_ratio=ratio)
+ logger.info(
+ "Using '%s' local AI profile (max edge %d px, metrics: %s)",
+ profile.name,
+ profile.max_edge,
+ ", ".join(spec.name.upper() for spec in profile.metric_specs),
+ )
+ return profile
+
+
+@dataclass(frozen=True)
+class HeuristicCandidate:
+ image_path: str
+ score: float
+ sharpness: float
+ exposure_balance: float
+ histogram_balance: float
+ eye_openness: float
+
+ def as_dict(self) -> Dict[str, float]:
+ return {
+ "score": self.score,
+ "sharpness": self.sharpness,
+ "exposure_balance": self.exposure_balance,
+ "histogram_balance": self.histogram_balance,
+ "eye_openness": self.eye_openness,
+ }
+
+
+class FastHeuristicStage:
+ """
+ Lightweight heuristics that quickly reject obviously bad frames before heavy IQA.
+
+ Signals reuse the same Laplacian variance idea as the blur detector plus
+ coarse histogram/contrast checks and (optionally) the eye-state classifier.
+ """
+
+ def __init__(
+ self, image_pipeline, preview_max_edge: int = PREFILTER_PREVIEW_MAX_EDGE
+ ):
+ self._image_pipeline = image_pipeline
+ self._preview_max_edge = preview_max_edge
+ self._eye_detection_disabled = False
+ self._eye_analyzer_local = threading.local()
+
+ def _load_preview(self, image_path: str) -> Optional[Image.Image]:
+ preview = None
+ if self._image_pipeline is not None:
+ try:
+ preview = self._image_pipeline.get_preview_image(image_path)
+ if preview is not None:
+ preview = preview.copy()
+ except Exception:
+ logger.debug(
+ "Heuristic preview load failed via pipeline for %s",
+ image_path,
+ exc_info=True,
+ )
+ if preview is None:
+ try:
+ with Image.open(image_path) as raw:
+ prepared = ImageOps.exif_transpose(raw)
+ preview = prepared.convert("RGB").copy()
+ except Exception:
+ logger.debug(
+ "Heuristic preview load failed from disk for %s",
+ image_path,
+ exc_info=True,
+ )
+ return None
+ try:
+ prepared = _downscale_image(preview, self._preview_max_edge)
+ if prepared is preview:
+ # Ensure caller gets a live image even if no resize was needed.
+ prepared = prepared.copy()
+ return prepared
+ finally:
+ try:
+ preview.close()
+ except Exception:
+ pass
+
+ def _estimate_sharpness(self, image: Image.Image) -> float:
+ if cv2 is None:
+ return 0.5
+ try:
+ gray = np.array(image.convert("L"))
+ variance = float(cv2.Laplacian(gray, cv2.CV_64F).var())
+ normalized = variance / HEURISTIC_SHARPNESS_NORMALIZER
+ return max(0.0, min(1.0, normalized))
+ except Exception:
+ logger.debug("Sharpness heuristic failed", exc_info=True)
+ return 0.5
+
+ @staticmethod
+ def _estimate_exposure_balance(image: Image.Image) -> float:
+ gray = image.convert("L")
+ stats = ImageStat.Stat(gray)
+ mean_luma = stats.mean[0] / 255.0 if stats.mean else 0.5
+ stddev = (
+ stats.stddev[0] if stats.stddev else 0.0
+ ) / HEURISTIC_CONTRAST_NORMALIZER
+ brightness_penalty = min(1.0, abs(mean_luma - 0.5) * 2.0)
+ score = max(0.0, 1.0 - brightness_penalty)
+ contrast_bonus = max(0.0, min(1.0, stddev))
+ return 0.6 * score + 0.4 * contrast_bonus
+
+ @staticmethod
+ def _estimate_histogram_balance(image: Image.Image) -> float:
+ gray = image.convert("L")
+ hist = gray.histogram()
+ total = sum(hist)
+ if total <= 0:
+ return 0.5
+ tail_bins = 6
+ shadow_ratio = sum(hist[:tail_bins]) / total
+ highlight_ratio = sum(hist[-tail_bins:]) / total
+ clipping = shadow_ratio + highlight_ratio
+ return max(0.0, 1.0 - clipping * 3.0)
+
+ def _get_eye_analyzer(self) -> Optional["EyeStateAnalyzer"]:
+ if self._eye_detection_disabled:
+ return None
+ analyzer = getattr(self._eye_analyzer_local, "instance", None)
+ if analyzer is not None:
+ return analyzer
+ try:
+ analyzer = EyeStateAnalyzer(max_faces=1)
+ except Exception:
+ logger.warning(
+ "EyeStateAnalyzer unavailable; heuristic stage will skip eye checks."
+ )
+ self._eye_detection_disabled = True
+ return None
+ self._eye_analyzer_local.instance = analyzer
+ return analyzer
+
+ def _estimate_eye_openness(self, image: Image.Image) -> float:
+ analyzer = self._get_eye_analyzer()
+ if analyzer is None:
+ return 0.5
+ try:
+ probability = analyzer.predict_open_probability(image)
+ if probability is None:
+ return 0.5
+ return max(0.0, min(1.0, float(probability)))
+ except Exception:
+ logger.debug("Eye-state heuristic failed", exc_info=True)
+ return 0.5
+
+ def evaluate(self, image_path: str) -> Optional[HeuristicCandidate]:
+ preview = self._load_preview(image_path)
+ if preview is None:
+ return None
+ try:
+ sharpness = self._estimate_sharpness(preview)
+ exposure = self._estimate_exposure_balance(preview)
+ histogram_balance = self._estimate_histogram_balance(preview)
+ eye_openness = self._estimate_eye_openness(preview)
+ score = (
+ 0.5 * sharpness
+ + 0.2 * exposure
+ + 0.15 * histogram_balance
+ + 0.15 * eye_openness
+ )
+ return HeuristicCandidate(
+ image_path=image_path,
+ score=score,
+ sharpness=sharpness,
+ exposure_balance=exposure,
+ histogram_balance=histogram_balance,
+ eye_openness=eye_openness,
+ )
+ finally:
+ try:
+ preview.close()
+ except Exception:
+ pass
+
+
def _load_font(image_size: Tuple[int, int]) -> ImageFont.ImageFont:
longer_side = max(image_size)
font_size = max(24, int(longer_side * 0.08))
@@ -118,16 +393,33 @@ def _image_to_base64(image: Image.Image) -> str:
return base64.b64encode(buf.getvalue()).decode("utf-8")
+def _downscale_image(
+ image: Image.Image, max_edge: int = MAX_LOCAL_ANALYSIS_EDGE
+) -> Image.Image:
+ width, height = image.size
+ longest = max(width, height)
+ if longest <= max_edge:
+ return image
+ scale = max_edge / float(longest)
+ new_size = (
+ max(1, int(round(width * scale))),
+ max(1, int(round(height * scale))),
+ )
+ return image.resize(new_size, _RESAMPLE_BEST)
+
+
class BaseBestShotStrategy:
def __init__(
self,
models_root: Optional[str],
image_pipeline,
llm_config: Optional[LLMConfig] = None,
+ status_callback: Optional[Callable[[str], None]] = None,
) -> None:
self.models_root = models_root
self.image_pipeline = image_pipeline
self.llm_config = llm_config
+ self._status_callback = status_callback
@property
def max_workers(self) -> int:
@@ -148,10 +440,93 @@ def validate_connection(self) -> None:
"""Optional connectivity check before work begins."""
+def _normalize_for_rating(value: float, *, lower: float, upper: float) -> float:
+ if upper <= lower:
+ return 0.0
+ normalized = (value - lower) / (upper - lower)
+ return max(0.0, min(1.0, normalized))
+
+
+def _map_score_to_rating(normalized_score: float) -> int:
+ for idx, threshold in enumerate(RATING_THRESHOLDS, start=1):
+ if normalized_score < threshold:
+ return idx
+ return len(RATING_THRESHOLDS) + 1
+
+
+def _compute_quality_rating(result) -> Tuple[int, float]:
+ def _is_number(value: object) -> bool:
+ return isinstance(value, (int, float)) and not isinstance(value, bool)
+
+ quality_score: Optional[float] = None
+
+ composite = getattr(result, "composite_score", None)
+ if _is_number(composite):
+ composite_value = float(composite)
+ if math.isfinite(composite_value):
+ quality_score = composite_value
+
+ if quality_score is None:
+ metrics = getattr(result, "metrics", {}) or {}
+ metric_values = [
+ float(value) for value in metrics.values() if _is_number(value)
+ ]
+ if metric_values:
+ quality_score = sum(metric_values) / len(metric_values)
+
+ if quality_score is None:
+ samples: List[float] = []
+ raw = getattr(result, "raw_metrics", {}) or {}
+ for field_name, (lower, upper) in QUALITY_NORMALIZATION_RANGES.items():
+ raw_value = raw.get(field_name)
+ if _is_number(raw_value):
+ samples.append(
+ _normalize_for_rating(raw_value, lower=lower, upper=upper)
+ )
+ if samples:
+ quality_score = sum(samples) / len(samples)
+
+ if quality_score is None:
+ quality_score = 0.0
+
+ quality_score = max(0.0, min(1.0, float(quality_score)))
+ rating = _map_score_to_rating(quality_score)
+ return rating, quality_score
+
+
class LocalBestShotStrategy(BaseBestShotStrategy):
- def __init__(self, models_root, image_pipeline, llm_config=None) -> None:
- super().__init__(models_root, image_pipeline, llm_config)
+ def __init__(
+ self,
+ models_root,
+ image_pipeline,
+ llm_config=None,
+ status_callback: Optional[Callable[[str], None]] = None,
+ ) -> None:
+ super().__init__(
+ models_root, image_pipeline, llm_config, status_callback=status_callback
+ )
self._thread_local = threading.local()
+ self._device_hint = get_preferred_torch_device()
+ self._analysis_profile = _determine_local_analysis_profile()
+ self._max_local_analysis_edge = self._analysis_profile.max_edge
+ self._metric_specs = self._analysis_profile.metric_specs
+ self._prefilter_stage = FastHeuristicStage(image_pipeline)
+ responsive_profile = self._analysis_profile is _RESPONSIVE_ANALYSIS_PROFILE
+ min_prefilter_workers = 1 if responsive_profile else 2
+ max_prefilter_workers = 2 if responsive_profile else 4
+ self._prefilter_workers = max(
+ 1,
+ calculate_max_workers(
+ min_workers=min_prefilter_workers, max_workers=max_prefilter_workers
+ ),
+ )
+ logger.info(
+ "Local best-shot strategy targeting torch device '%s'", self._device_hint
+ )
+
+ @property
+ def max_workers(self) -> int:
+ return calculate_max_workers(min_workers=1, max_workers=8)
def _get_selector(self) -> BestPhotoSelector:
selector = getattr(self._thread_local, "selector", None)
@@ -159,25 +534,24 @@ def _get_selector(self) -> BestPhotoSelector:
# Use image pipeline for better RAW and format support
image_loader = self._create_image_loader() if self.image_pipeline else None
selector = BestPhotoSelector(
- models_root=self.models_root, image_loader=image_loader
+ models_root=self.models_root,
+ image_loader=image_loader,
+ status_callback=self._status_callback,
+ device=self._device_hint,
+ metric_specs=self._metric_specs,
)
self._thread_local.selector = selector
return selector
def _create_image_loader(self):
- """Create an image loader that uses the image pipeline for RAW and format support."""
+ """Create an image loader using the app pipeline + downscaling for efficiency."""
def pipeline_image_loader(image_path: str) -> Image.Image:
try:
# Use image pipeline to get preview (handles RAW files properly)
preview = self.image_pipeline.get_preview_image(image_path)
if preview is not None:
- if preview.mode != "RGB":
- preview = preview.convert("RGB")
- # Ensure required metadata is set
- preview.info.setdefault("source_path", image_path)
- preview.info.setdefault("region", "full")
- return preview
+ return self._prepare_image(preview, image_path)
except Exception as exc:
logger.warning("Image pipeline failed for %s: %s", image_path, exc)
@@ -194,13 +568,9 @@ def pipeline_image_loader(image_path: str) -> Image.Image:
".webp",
}:
try:
- from PIL import ImageOps
-
with Image.open(image_path) as img:
- prepared = ImageOps.exif_transpose(img).convert("RGB")
- prepared.info["source_path"] = image_path
- prepared.info["region"] = "full"
- return prepared.copy()
+ prepared = ImageOps.exif_transpose(img)
+ return self._prepare_image(prepared, image_path)
except Exception as exc:
logger.error(
"Failed to load standard format image %s: %s", image_path, exc
@@ -214,21 +584,204 @@ def pipeline_image_loader(image_path: str) -> Image.Image:
return pipeline_image_loader
+ def _prepare_image(self, image: Image.Image, source_path: str) -> Image.Image:
+ prepared = image.copy()
+ if prepared.mode != "RGB":
+ prepared = prepared.convert("RGB")
+ prepared = _downscale_image(prepared, self._max_local_analysis_edge)
+ prepared.info.setdefault("source_path", source_path)
+ prepared.info.setdefault("region", "full")
+ return prepared
+
+ def _evaluate_prefilter_candidates(
+ self, stage: FastHeuristicStage, image_paths: Sequence[str]
+ ) -> Dict[str, Optional[HeuristicCandidate]]:
+ results: Dict[str, Optional[HeuristicCandidate]] = {}
+ if not image_paths:
+ return results
+ worker_count = min(self._prefilter_workers, len(image_paths))
+ if worker_count <= 1:
+ for path in image_paths:
+ try:
+ results[path] = stage.evaluate(path)
+ except Exception:
+ logger.debug(
+ "Heuristic evaluation failed for %s", path, exc_info=True
+ )
+ results[path] = None
+ else:
+ with ThreadPoolExecutor(max_workers=worker_count) as executor:
+ future_map = {
+ executor.submit(stage.evaluate, path): path for path in image_paths
+ }
+ for future in as_completed(future_map):
+ path = future_map[future]
+ try:
+ results[path] = future.result()
+ except Exception:
+ logger.debug(
+ "Heuristic evaluation raised unexpectedly for %s",
+ path,
+ exc_info=True,
+ )
+ results[path] = None
+ for path in image_paths:
+ results.setdefault(path, None)
+ return results
+
+ def _prefilter_cluster(
+ self, cluster_id: int, image_paths: Sequence[str]
+ ) -> Tuple[List[str], Dict[str, HeuristicCandidate]]:
+ if len(image_paths) < PREFILTER_MIN_CLUSTER_SIZE:
+ return list(image_paths), {}
+
+ limit = min(PREFILTER_MAX_CANDIDATES, len(image_paths))
+ if limit >= len(image_paths):
+ return list(image_paths), {}
+
+ stage = self._prefilter_stage
+ if stage is None:
+ return list(image_paths), {}
+
+ evaluations = self._evaluate_prefilter_candidates(stage, image_paths)
+ scored: List[HeuristicCandidate] = []
+ fallbacks: List[str] = []
+ for path in image_paths:
+ candidate = evaluations.get(path)
+ if candidate is None:
+ fallbacks.append(path)
+ continue
+ scored.append(candidate)
+
+ if not scored:
+ return list(image_paths), {}
+
+ scored.sort(key=lambda c: c.score, reverse=True)
+ selected = [candidate.image_path for candidate in scored[:limit]]
+ if len(selected) < limit:
+ for path in fallbacks:
+ if path not in selected:
+ selected.append(path)
+ if len(selected) >= limit:
+ break
+
+ if len(selected) < len(image_paths):
+ logger.info(
+ "Heuristic prefilter reduced cluster %s from %d to %d candidates",
+ cluster_id,
+ len(image_paths),
+ len(selected),
+ )
+
+ info_map = {candidate.image_path: candidate for candidate in scored}
+ return selected, info_map
+
def rank_cluster(
self, cluster_id: int, image_paths: Sequence[str]
) -> List[Dict[str, object]]:
logger.info(
f"Local AI ranking cluster {cluster_id} with {len(image_paths)} images using local models"
)
- selector = self._get_selector()
- results = selector.rank_images(image_paths)
- ranked_results = [r.to_dict() for r in results]
+ candidate_paths, prefilter_map = self._prefilter_cluster(
+ cluster_id, image_paths
+ )
+ if len(candidate_paths) != len(image_paths):
+ logger.info(
+ "Cluster %s trimmed to %d candidate(s) prior to IQA",
+ cluster_id,
+ len(candidate_paths),
+ )
+ worker_count = min(self.max_workers, len(candidate_paths))
+ if worker_count > 1:
+ logger.debug(
+ "Parallel IQA scoring enabled for cluster %s with %d worker(s)",
+ cluster_id,
+ worker_count,
+ )
+ result_objects = self._rank_images_parallel(candidate_paths, worker_count)
+ else:
+ selector = self._get_selector()
+ result_objects = selector.rank_images(candidate_paths)
+ ranked_results: List[Dict[str, object]] = []
+ for result in result_objects:
+ payload = result.to_dict()
+ info = prefilter_map.get(payload.get("image_path"))
+ if info:
+ payload["prefilter"] = info.as_dict()
+ if logger.isEnabledFor(logging.DEBUG):
+ image_name = os.path.basename(payload.get("image_path", ""))
+ composite = payload.get("composite_score", 0.0)
+ metrics = payload.get("metrics") or {}
+ metric_summary = ", ".join(
+ f"{name.upper()} {value:.3f}"
+ for name, value in sorted(metrics.items())
+ if isinstance(value, (int, float))
+ )
+ eye_value = metrics.get("eyes_open")
+ if (
+ isinstance(eye_value, (int, float))
+ and "EYES_OPEN" not in metric_summary
+ ):
+ metric_summary = (
+ f"{metric_summary}, EYES_OPEN {eye_value:.3f}"
+ if metric_summary
+ else f"EYES_OPEN {eye_value:.3f}"
+ )
+ prefilter = payload.get("prefilter") or {}
+ if prefilter:
+ prefilter_summary = ", ".join(
+ f"{key}={value:.3f}"
+ if isinstance(value, (int, float))
+ else f"{key}={value}"
+ for key, value in sorted(prefilter.items())
+ )
+ metric_summary = (
+ f"{metric_summary} | prefilter: {prefilter_summary}"
+ if metric_summary
+ else f"prefilter: {prefilter_summary}"
+ )
+ logger.debug(
+ "Cluster %s candidate %s -> composite %.4f%s",
+ cluster_id,
+ image_name or payload.get("image_path"),
+ composite,
+ f" ({metric_summary})" if metric_summary else "",
+ )
+ ranked_results.append(payload)
if ranked_results:
logger.info(
f"Completed local AI ranking for cluster {cluster_id}. Best image: {os.path.basename(ranked_results[0]['image_path'])}"
)
return ranked_results
+ def _rank_images_parallel(
+ self, image_paths: Sequence[str], worker_count: int
+ ) -> List[BestShotResult]:
+ results: List[BestShotResult] = []
+
+ def _evaluate(path: str) -> Optional[BestShotResult]:
+ selector = self._get_selector()
+ return selector.analyze_image(path)
+
+ with ThreadPoolExecutor(max_workers=worker_count) as executor:
+ futures = {executor.submit(_evaluate, path): path for path in image_paths}
+ for future in as_completed(futures):
+ path = futures[future]
+ try:
+ result = future.result()
+ except Exception as exc:
+ logger.warning(
+ "Parallel IQA scoring failed for %s: %s",
+ path,
+ exc,
+ exc_info=True,
+ )
+ continue
+ if result:
+ results.append(result)
+ results.sort(key=lambda r: r.composite_score, reverse=True)
+ return results
+
def rate_image(self, image_path: str) -> Optional[Dict[str, object]]:
logger.info(f"Local AI rating image: {os.path.basename(image_path)}")
selector = self._get_selector()
@@ -236,22 +789,32 @@ def rate_image(self, image_path: str) -> Optional[Dict[str, object]]:
if not results:
return None
result = results[0]
- score = result.composite_score
- rating = max(1, min(5, int(round(score * 4 + 1))))
+ rating, quality_score = _compute_quality_rating(result)
logger.info(
- f"Local AI rated {os.path.basename(image_path)} as {rating}/5 (score: {score:.3f})"
+ "Local AI rated %s as %d/5 (quality score %.3f)",
+ os.path.basename(image_path),
+ rating,
+ quality_score,
)
return {
"image_path": image_path,
"rating": rating,
- "score": score,
+ "score": quality_score,
"metrics": result.metrics,
}
class LLMBestShotStrategy(BaseBestShotStrategy):
- def __init__(self, models_root, image_pipeline, llm_config: LLMConfig) -> None:
- super().__init__(models_root, image_pipeline, llm_config)
+ def __init__(
+ self,
+ models_root,
+ image_pipeline,
+ llm_config: LLMConfig,
+ status_callback: Optional[Callable[[str], None]] = None,
+ ) -> None:
+ super().__init__(
+ models_root, image_pipeline, llm_config, status_callback=status_callback
+ )
try:
from openai import OpenAI # type: ignore
except ImportError as exc: # pragma: no cover
@@ -381,37 +944,6 @@ def _call_llm(
content = getattr(message, "content", None) or ""
return message, content
- def _extract_rating(self, analysis: str) -> Optional[int]:
- if not analysis:
- return None
-
- # Try JSON parsing first, as the prompt requests structured output
- try:
- parsed = json.loads(analysis)
- if isinstance(parsed, dict) and "rating" in parsed:
- return int(round(float(parsed["rating"])))
- except (ValueError, TypeError, json.JSONDecodeError):
- # Fall back to unstructured parsing if the model returned plain text.
- pass
-
- patterns = [
- r"\brating\b[^0-9]*([1-5](?:\.[0-9]+)?)",
- r"\boverall rating\b[^0-9]*([1-5](?:\.[0-9]+)?)",
- r"\bscore\b[^0-9]*([1-5](?:\.[0-9]+)?)",
- r"([1-5])\s*/\s*5",
- r"([1-5])\s*out of\s*5",
- r"([1-5])\s*stars",
- ]
- for pattern in patterns:
- match = re.search(pattern, analysis, re.IGNORECASE)
- if match:
- try:
- return int(round(float(match.group(1))))
- except (ValueError, TypeError):
- continue
-
- return None
-
def validate_connection(self) -> None:
probe_timeout = min(max(5, int(self._timeout * 0.25)), max(self._timeout, 5))
client = self._with_timeout(probe_timeout)
@@ -656,29 +1188,10 @@ def rate_image(self, image_path: str) -> Optional[Dict[str, object]]:
os.path.basename(image_path),
snippet or "",
)
- if structured_payload and not analysis:
- breakdown = structured_payload.get("score_breakdown", {})
- breakdown_parts = [
- f"{name.replace('_', ' ')} {value}" for name, value in breakdown.items()
- ]
- notes = structured_payload.get("notes")
- confidence = structured_payload.get("confidence")
- summary_bits = []
- if breakdown_parts:
- summary_bits.append(" | ".join(breakdown_parts))
- if notes:
- summary_bits.append(notes)
- if confidence:
- summary_bits.append(f"confidence: {confidence}")
- analysis = " ".join(summary_bits)
-
payload = {
"image_path": image_path,
"rating": rating,
- "analysis": analysis,
}
- if structured_payload:
- payload["quality_scores"] = structured_payload
return payload
@@ -688,6 +1201,7 @@ def create_best_shot_strategy(
models_root: Optional[str] = None,
image_pipeline=None,
llm_config: Optional[LLMConfig] = None,
+ status_callback: Optional[Callable[[str], None]] = None,
) -> BaseBestShotStrategy:
"""Create AI strategy for image analysis.
@@ -699,15 +1213,27 @@ def create_best_shot_strategy(
if engine_name == BestShotEngine.LLM.value:
config = llm_config or LLMConfig(**get_openai_config())
logger.info(f"Using LLM strategy with endpoint: {config.base_url}")
- return LLMBestShotStrategy(models_root, image_pipeline, config)
+ return LLMBestShotStrategy(
+ models_root,
+ image_pipeline,
+ config,
+ status_callback=status_callback,
+ )
logger.info("Using local model strategy")
- return LocalBestShotStrategy(models_root, image_pipeline, llm_config)
+ return LocalBestShotStrategy(
+ models_root,
+ image_pipeline,
+ llm_config,
+ status_callback=status_callback,
+ )
__all__ = [
"BestShotEngine",
"LLMBestShotStrategy",
"LocalBestShotStrategy",
+ "LocalAnalysisProfile",
"create_best_shot_strategy",
"LLMConfig",
+ "select_local_analysis_profile",
]
diff --git a/src/core/ai/model_checker.py b/src/core/ai/model_checker.py
index f125968..8882ead 100644
--- a/src/core/ai/model_checker.py
+++ b/src/core/ai/model_checker.py
@@ -1,31 +1,17 @@
-"""
-Model availability checker for best-shot analysis models.
-
-Verifies that all required external models (face detector, eye classifier,
-aesthetic predictor) are present before attempting to instantiate the
-BestPhotoSelector. Raises ModelDependencyError with actionable messages
-when any model is missing.
-"""
+"""Dependency checker for the IQA-based best-shot pipeline."""
from __future__ import annotations
+import importlib.util
import logging
-import os
from dataclasses import dataclass
from typing import List, Optional
logger = logging.getLogger(__name__)
-PROJECT_ROOT = os.path.abspath(
- os.path.join(os.path.dirname(__file__), "..", "..", "..")
-)
-DEFAULT_MODELS_ROOT = os.environ.get(
- "PHOTOSORT_MODELS_DIR", os.path.join(PROJECT_ROOT, "models")
-)
-
class ModelDependencyError(Exception):
- """Raised when one or more required models are missing."""
+ """Raised when one or more required dependencies are missing."""
def __init__(self, missing_models: List["MissingModelInfo"]):
self.missing_models = missing_models
@@ -33,12 +19,12 @@ def __init__(self, missing_models: List["MissingModelInfo"]):
def _format_message(self) -> str:
model_names = ", ".join(m.name for m in self.missing_models)
- return f"Required models not found: {model_names}"
+ return f"Required best-shot dependencies not found: {model_names}"
@dataclass
class MissingModelInfo:
- """Information about a missing model dependency."""
+ """Information about a missing runtime dependency."""
name: str
description: str
@@ -46,101 +32,67 @@ class MissingModelInfo:
download_url: str
-def check_best_shot_models(models_root: Optional[str] = None) -> List[MissingModelInfo]:
- """
- Check for the presence of all required best-shot analysis models.
-
- Args:
- models_root: Root directory where models are stored. Defaults to
- PHOTOSORT_MODELS_DIR env var or PROJECT_ROOT/models.
+def _module_available(module_name: str) -> bool:
+ return importlib.util.find_spec(module_name) is not None
- Returns:
- List of MissingModelInfo for each missing model. Empty list if all present.
- """
- models_root = models_root or DEFAULT_MODELS_ROOT
- missing: List[MissingModelInfo] = []
- # 1. Face detector (BlazeFace ONNX)
- face_detector_paths = [
- os.path.join(models_root, "job_jgzjewkop_optimized_onnx", "model.onnx"),
- os.path.join(
- models_root,
- "MediaPipe-Face-Detection_FaceDetector_float",
- "model.onnx",
+def _dependency_catalog() -> List[tuple[str, MissingModelInfo]]:
+ return [
+ (
+ "torch",
+ MissingModelInfo(
+ name="PyTorch",
+ description=(
+ "Deep learning runtime required by the MUSIQ/MANIQA/LIQE pipeline."
+ ),
+ expected_path="pip install torch --extra-index-url https://download.pytorch.org/whl/cpu",
+ download_url="https://pytorch.org/get-started/locally/",
+ ),
),
- ]
- if not any(os.path.exists(p) for p in face_detector_paths):
- missing.append(
+ (
+ "pyiqa",
MissingModelInfo(
- name="Face Detector",
- description="MediaPipe BlazeFace ONNX model for face detection",
- expected_path=os.path.join(models_root, "job_*/model.onnx"),
- download_url="https://huggingface.co/qualcomm/MediaPipe-Face-Detection",
- )
- )
+ name="pyiqa (MUSIQ/MANIQA/LIQE)",
+ description=(
+ "Python Image Quality Assessment package that bundles the"
+ " MUSIQ, MANIQA, and LIQE checkpoints."
+ ),
+ expected_path="pip install pyiqa",
+ download_url="https://github.com/chaofengc/IQA-PyTorch",
+ ),
+ ),
+ ]
- # 2. Eye-state classifier
- eye_classifier_dir = os.path.join(
- models_root, "open-closed-eye-classification-mobilev2"
- )
- if not os.path.isdir(eye_classifier_dir):
- missing.append(
- MissingModelInfo(
- name="Eye Classifier",
- description="MobileNetV2 model for open/closed eye classification",
- expected_path=eye_classifier_dir,
- download_url="https://huggingface.co/MichalMlodawski/open-closed-eye-classification-mobilev2",
- )
- )
- # 3. Aesthetic predictor
- aesthetic_dir = os.path.join(models_root, "aesthetic_predictor")
- if not os.path.isdir(aesthetic_dir):
- missing.append(
- MissingModelInfo(
- name="Aesthetic Predictor",
- description="CLIP-based aesthetic scoring model",
- expected_path=aesthetic_dir,
- download_url="https://huggingface.co/shunk031/aesthetics-predictor-v2-sac-logos-ava1-l14-linearMSE",
- )
- )
+def check_best_shot_models(models_root: Optional[str] = None) -> List[MissingModelInfo]:
+ """Ensure the IQA pipeline dependencies are present.
- # 4. BlazeFace anchors (bundled, but check just in case)
- bundled_anchors = os.path.join(
- os.path.dirname(__file__), "data", "blazeface_anchors.npy"
- )
- user_anchors = os.path.join(models_root, "blazeface_anchors.npy")
- if not os.path.exists(bundled_anchors) and not os.path.exists(user_anchors):
- missing.append(
- MissingModelInfo(
- name="BlazeFace Anchors",
- description="Anchor tensor for BlazeFace detector (usually bundled)",
- expected_path=user_anchors,
- download_url="https://github.com/duartebarbosadev/PhotoSort",
- )
+ Args:
+ models_root: Legacy argument for backwards compatibility (no longer used).
+ """
+
+ if models_root:
+ logger.debug(
+ "models_root argument is ignored for the IQA pipeline: %s", models_root
)
+ missing: List[MissingModelInfo] = []
+ for module_name, info in _dependency_catalog():
+ if not _module_available(module_name):
+ missing.append(info)
+
if missing:
logger.warning(
- "Best-shot models check failed: %d model(s) missing",
+ "Best-shot dependency check failed: %d missing item(s)",
len(missing),
)
else:
- logger.info("All best-shot models are present.")
+ logger.info("All IQA dependencies detected for best-shot analysis.")
return missing
def ensure_best_shot_models(models_root: Optional[str] = None) -> None:
- """
- Verify all best-shot models are present, raising ModelDependencyError if not.
-
- Args:
- models_root: Root directory where models are stored.
-
- Raises:
- ModelDependencyError: If any required model is missing.
- """
missing = check_best_shot_models(models_root)
if missing:
raise ModelDependencyError(missing)
diff --git a/src/core/app_settings.py b/src/core/app_settings.py
index a05b71c..20fa6eb 100644
--- a/src/core/app_settings.py
+++ b/src/core/app_settings.py
@@ -4,6 +4,7 @@
"""
import os
+from dataclasses import dataclass
from enum import Enum
from typing import Optional
from PyQt6.QtCore import QSettings
@@ -82,6 +83,16 @@ def from_string(cls, value: str) -> "PerformanceMode":
DEFAULT_OPENAI_MAX_WORKERS = 4
DEFAULT_BEST_SHOT_BATCH_SIZE = 3
+
+@dataclass(frozen=True)
+class LocalBestShotConstants:
+ model_stride: int = 32
+ tensor_cache_key: str = "_photosort_pyiqa_tensor"
+ eye_fallback_max_edge: int = 2048
+
+
+_LOCAL_BEST_SHOT_CONSTANTS = LocalBestShotConstants()
+
# --- UI Constants ---
# Grid view settings
FIXED_ICON_SIZE = 96 # Fixed icon size for grid view
@@ -284,6 +295,29 @@ def is_pytorch_cuda_available() -> bool:
return False
+def get_preferred_torch_device() -> str:
+ """Return the fastest available torch.device string (cuda, mps, or cpu)."""
+
+ try:
+ import torch
+ except ImportError:
+ return "cpu"
+
+ if torch.cuda.is_available():
+ return "cuda"
+
+ mps_backend = getattr(getattr(torch, "backends", None), "mps", None)
+ if mps_backend is not None:
+ try:
+ if mps_backend.is_available(): # type: ignore[attr-defined]
+ return "mps"
+ except Exception:
+ # Ignore transient failures when probing MPS; fall back to CPU instead.
+ pass
+
+ return "cpu"
+
+
# --- Orientation Model ---
def get_orientation_model_name() -> str | None:
"""Gets the configured orientation model name from settings."""
@@ -423,6 +457,11 @@ def set_best_shot_batch_size(batch_size: int) -> None:
settings.setValue(BEST_SHOT_BATCH_SIZE_KEY, max(2, int(batch_size)))
+def get_local_best_shot_constants() -> LocalBestShotConstants:
+ """Return immutable constants for the local best-shot pipeline."""
+ return _LOCAL_BEST_SHOT_CONSTANTS
+
+
def get_openai_config() -> dict:
settings = _get_settings()
diff --git a/src/core/image_features/model_rotation_detector.py b/src/core/image_features/model_rotation_detector.py
index ba30266..2a7fdc2 100644
--- a/src/core/image_features/model_rotation_detector.py
+++ b/src/core/image_features/model_rotation_detector.py
@@ -25,6 +25,7 @@
set_orientation_model_name,
ROTATION_MODEL_IMAGE_SIZE,
)
+from core.runtime_paths import is_frozen_runtime, resolve_runtime_root
logger = logging.getLogger(__name__)
@@ -221,13 +222,8 @@ def _resolve_model_path(self) -> Optional[str]:
"""
# Build candidate base dirs
base_dirs = []
- try:
- import sys as _sys
-
- if getattr(_sys, "_MEIPASS", None): # type: ignore[attr-defined]
- base_dirs.append(os.path.join(_sys._MEIPASS, MODEL_SAVE_DIR)) # type: ignore[attr-defined]
- except Exception:
- pass
+ if is_frozen_runtime():
+ base_dirs.append(os.path.join(resolve_runtime_root(), MODEL_SAVE_DIR))
project_root = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..", "..")
diff --git a/src/core/numpy_compat.py b/src/core/numpy_compat.py
new file mode 100644
index 0000000..fa5b2b0
--- /dev/null
+++ b/src/core/numpy_compat.py
@@ -0,0 +1,76 @@
+"""Compatibility helpers for third-party packages expecting NumPy < 2.0.
+
+Some dependencies (e.g. imgaug via pyiqa) still rely on ``np.sctypes`` which
+was removed in NumPy 2.0. Import this module early to reintroduce the
+attribute so those packages do not crash at import-time.
+"""
+
+from __future__ import annotations
+
+import numpy as np
+
+__all__ = ["ensure_numpy_sctypes"]
+
+
+def _collect_scalar_types(*type_names: str):
+ """Return unique NumPy scalar types for the provided attribute names."""
+ seen = set()
+ result = []
+ for name in type_names:
+ attr = getattr(np, name, None)
+ if attr is None or attr in seen:
+ continue
+ seen.add(attr)
+ result.append(attr)
+ return result
+
+
+def ensure_numpy_sctypes() -> None:
+ """Recreate ``np.sctypes`` when running on NumPy 2.0+."""
+ if hasattr(np, "sctypes"):
+ return
+
+ np.sctypes = { # type: ignore[attr-defined]
+ "int": _collect_scalar_types(
+ "byte",
+ "short",
+ "intc",
+ "intp",
+ "int_",
+ "longlong",
+ ),
+ "uint": _collect_scalar_types(
+ "ubyte",
+ "ushort",
+ "uintc",
+ "uintp",
+ "uint",
+ "ulonglong",
+ ),
+ "float": _collect_scalar_types(
+ "half",
+ "single",
+ "double",
+ "longdouble",
+ "float16",
+ "float32",
+ "float64",
+ ),
+ "complex": _collect_scalar_types(
+ "csingle",
+ "cdouble",
+ "clongdouble",
+ "complex64",
+ "complex128",
+ ),
+ "others": _collect_scalar_types(
+ "bool_",
+ "bytes_",
+ "str_",
+ "void",
+ ),
+ "character": _collect_scalar_types(
+ "bytes_",
+ "str_",
+ ),
+ }
diff --git a/src/core/runtime_paths.py b/src/core/runtime_paths.py
new file mode 100644
index 0000000..c637191
--- /dev/null
+++ b/src/core/runtime_paths.py
@@ -0,0 +1,40 @@
+"""Helpers for working with PyInstaller/runtime resource locations."""
+
+from __future__ import annotations
+
+import os
+import sys
+from typing import List, Optional
+
+
+def is_frozen_runtime() -> bool:
+ """Return True when running inside a PyInstaller bundle."""
+ return bool(getattr(sys, "frozen", False) or getattr(sys, "_MEIPASS", None))
+
+
+def resolve_runtime_root(fallback: Optional[str] = None) -> str:
+ """Resolve the base directory for resource lookups.
+
+ When frozen, prefer PyInstaller's extraction directory, otherwise the
+ directory containing the executable. During source runs, fall back to the
+ provided path (typically the project root) or the current working directory.
+ """
+ meipass = getattr(sys, "_MEIPASS", None)
+ if meipass:
+ return meipass
+ if getattr(sys, "frozen", False):
+ return os.path.dirname(sys.executable)
+ if fallback:
+ return fallback
+ return os.getcwd()
+
+
+def iter_bundle_roots(include_executable_dir: bool = False) -> List[str]:
+ """Return candidate directories that may contain bundled resources."""
+ locations: List[str] = []
+ meipass = getattr(sys, "_MEIPASS", None)
+ if meipass:
+ locations.append(meipass)
+ if include_executable_dir and getattr(sys, "frozen", False):
+ locations.append(os.path.dirname(sys.executable))
+ return locations
diff --git a/src/core/similarity_engine.py b/src/core/similarity_engine.py
index 6d73555..1665db9 100644
--- a/src/core/similarity_engine.py
+++ b/src/core/similarity_engine.py
@@ -8,6 +8,11 @@
from sklearn.cluster import DBSCAN
from core.image_pipeline import ImagePipeline
+from core.similarity_utils import (
+ adaptive_dbscan_eps,
+ l2_normalize_rows,
+ normalize_embedding_dict,
+)
from .app_settings import (
DEFAULT_CLIP_MODEL,
is_pytorch_cuda_available,
@@ -123,6 +128,13 @@ def _load_cached_embeddings(self) -> Dict[str, List[float]]:
logger.info(f"Loading embeddings cache: {self._cache_path}")
with open(self._cache_path, "rb") as f:
cache_data = pickle.load(f)
+ if isinstance(cache_data, dict) and cache_data:
+ if normalize_embedding_dict(cache_data):
+ logger.info(
+ "Detected legacy non-normalized embeddings. "
+ "Updating cache to normalized vectors."
+ )
+ self._save_embeddings_to_cache(cache_data)
logger.info(
f"Loaded {len(cache_data)} embeddings from cache in {time.perf_counter() - cache_load_start_time:.4f}s"
)
@@ -229,6 +241,8 @@ def generate_embeddings_for_files(self, file_paths: List[str]):
batch_embeds = self.model.encode(
batch_images, show_progress_bar=False, convert_to_numpy=True
)
+ batch_embeds = np.asarray(batch_embeds, dtype=np.float32)
+ batch_embeds = l2_normalize_rows(batch_embeds)
for path_idx, path in enumerate(valid_paths_in_batch):
new_embeddings[path] = batch_embeds[path_idx].tolist()
@@ -283,19 +297,26 @@ def cluster_embeddings(self, embeddings: Dict[str, List[float]]):
filepaths = list(embeddings.keys())
embedding_matrix = np.array(list(embeddings.values()), dtype=np.float32)
+ embedding_matrix = l2_normalize_rows(embedding_matrix)
num_samples, _ = embedding_matrix.shape
labels = None
+ adaptive_eps = adaptive_dbscan_eps(
+ embedding_matrix, DBSCAN_EPS, DBSCAN_MIN_SAMPLES
+ )
try:
logger.info(
- f"Running DBSCAN clustering on {num_samples} embeddings (eps={DBSCAN_EPS}, min_samples={DBSCAN_MIN_SAMPLES})."
+ "Running DBSCAN clustering on %d embeddings (eps=%.4f, min_samples=%d).",
+ num_samples,
+ adaptive_eps,
+ DBSCAN_MIN_SAMPLES,
)
# Ensure embedding_matrix is C-contiguous, which is expected by DBSCAN
if not embedding_matrix.flags["C_CONTIGUOUS"]:
embedding_matrix = np.ascontiguousarray(embedding_matrix)
dbscan = DBSCAN(
- eps=DBSCAN_EPS, min_samples=DBSCAN_MIN_SAMPLES, metric="cosine"
+ eps=adaptive_eps, min_samples=DBSCAN_MIN_SAMPLES, metric="cosine"
)
dbscan_labels = dbscan.fit_predict(embedding_matrix)
diff --git a/src/core/similarity_utils.py b/src/core/similarity_utils.py
new file mode 100644
index 0000000..5ea21cb
--- /dev/null
+++ b/src/core/similarity_utils.py
@@ -0,0 +1,74 @@
+from __future__ import annotations
+
+import logging
+from typing import Dict, List, Tuple
+
+import numpy as np
+from sklearn.neighbors import NearestNeighbors
+
+logger = logging.getLogger(__name__)
+
+
+def l2_normalize_rows(matrix: np.ndarray) -> np.ndarray:
+ """Return a row-wise L2-normalized copy of the matrix."""
+ if matrix.size == 0:
+ return matrix
+ norms = np.linalg.norm(matrix, axis=1, keepdims=True)
+ norms = np.where(norms == 0, 1.0, norms)
+ return matrix / norms
+
+
+def normalize_embedding_vector(values: List[float]) -> Tuple[List[float], bool]:
+ """Normalize a single embedding vector, returning (normalized_list, changed_flag)."""
+ arr = np.asarray(values, dtype=np.float32)
+ norm = float(np.linalg.norm(arr))
+ if not np.isfinite(norm) or norm == 0.0:
+ return arr.tolist(), False
+ if abs(norm - 1.0) <= 1e-4:
+ return arr.tolist(), False
+ return (arr / norm).tolist(), True
+
+
+def normalize_embedding_dict(embeddings: Dict[str, List[float]]) -> bool:
+ """Normalize all embedding vectors in-place. Returns True if any were updated."""
+ updated = False
+ for path, vector in list(embeddings.items()):
+ if not isinstance(vector, (list, tuple, np.ndarray)):
+ continue
+ normalized, changed = normalize_embedding_vector(list(vector))
+ if changed:
+ embeddings[path] = normalized
+ updated = True
+ return updated
+
+
+def adaptive_dbscan_eps(
+ embedding_matrix: np.ndarray, base_eps: float, min_samples: int
+) -> float:
+ """Estimate a data-driven epsilon for DBSCAN using cosine k-distances."""
+ sample_count = embedding_matrix.shape[0]
+ if sample_count <= max(min_samples * 2, 4):
+ return base_eps
+ neighbor_count = min(
+ max(min_samples + 1, min_samples * 3), sample_count
+ ) # ensure > min_samples
+ try:
+ nn = NearestNeighbors(metric="cosine", n_neighbors=neighbor_count)
+ nn.fit(embedding_matrix)
+ distances, _ = nn.kneighbors(embedding_matrix)
+ except Exception:
+ logger.exception("Adaptive eps estimation failed; falling back to base epsilon")
+ return base_eps
+
+ kth_index = min_samples - 1
+ if kth_index < 0:
+ return base_eps
+ kth_index = min(kth_index, distances.shape[1] - 1)
+ kth_distances = distances[:, kth_index]
+ finite = kth_distances[np.isfinite(kth_distances)]
+ if finite.size == 0:
+ return base_eps
+
+ adaptive_component = float(np.percentile(finite, 65))
+ adaptive_component = max(0.005, min(0.3, adaptive_component))
+ return float((adaptive_component + base_eps) / 2.0)
diff --git a/src/core/utils/time_utils.py b/src/core/utils/time_utils.py
new file mode 100644
index 0000000..0715f9b
--- /dev/null
+++ b/src/core/utils/time_utils.py
@@ -0,0 +1,28 @@
+"""Time-related helper utilities shared across workers."""
+
+from __future__ import annotations
+
+import math
+
+
+def format_duration(seconds: float) -> str:
+ """
+ Return a compact human-readable duration string like '1h 05m 12s'.
+ Values that are NaN/inf or negative yield an empty string.
+ """
+ if not math.isfinite(seconds):
+ return ""
+ seconds = max(0, int(round(seconds)))
+ hours, remainder = divmod(seconds, 3600)
+ minutes, secs = divmod(remainder, 60)
+ parts: list[str] = []
+ if hours:
+ parts.append(f"{hours}h")
+ if minutes or hours:
+ parts.append(f"{minutes}m")
+ if secs or not parts:
+ parts.append(f"{secs}s")
+ return " ".join(parts)
+
+
+__all__ = ["format_duration"]
diff --git a/src/main.py b/src/main.py
index 9929cc1..3568eab 100644
--- a/src/main.py
+++ b/src/main.py
@@ -8,6 +8,28 @@
if SRC_DIR and SRC_DIR not in sys.path:
sys.path.insert(0, SRC_DIR)
+PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
+
+from core.runtime_paths import ( # noqa: E402
+ is_frozen_runtime,
+ iter_bundle_roots,
+ resolve_runtime_root,
+)
+
+
+def _ensure_local_model_cache():
+ """Point PyInstaller builds to the bundled models directory."""
+ if not is_frozen_runtime():
+ return
+ base_dir = resolve_runtime_root(PROJECT_ROOT)
+ models_dir = os.path.abspath(os.path.join(base_dir, "models"))
+ os.makedirs(models_dir, exist_ok=True)
+ os.environ.setdefault("PHOTOSORT_MODELS_DIR", models_dir)
+ os.environ.setdefault("PYIQA_CACHE_DIR", models_dir)
+
+
+_ensure_local_model_cache()
+
# Initialize pyexiv2 before any Qt imports - this is CRITICAL for Windows stability
try:
from core.pyexiv2_init import ensure_pyexiv2_initialized # noqa: E402
@@ -33,16 +55,7 @@ def load_stylesheet(filename: str = "src/ui/dark_theme.qss") -> str:
checking for the temporary extraction directory at runtime.
"""
try:
- # Determine base directory depending on runtime context
- base_dir: str
- meipass = getattr(sys, "_MEIPASS", None) # type: ignore[attr-defined]
- if meipass:
- base_dir = meipass # PyInstaller onefile extraction dir
- elif getattr(sys, "frozen", False): # PyInstaller onedir
- base_dir = os.path.dirname(sys.executable)
- else:
- # Running from source
- base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
+ base_dir = resolve_runtime_root(PROJECT_ROOT)
# Candidate locations, in order of preference
candidates = [
@@ -168,18 +181,12 @@ def _find_resource_path(filename: str, include_exe_dir: bool = False) -> Optiona
Returns the first existing file path, or None if not found.
"""
- meipass = getattr(sys, "_MEIPASS", None)
-
- candidates = []
- if meipass:
- candidates.append(os.path.join(meipass, filename))
+ candidates = [
+ os.path.join(root, filename)
+ for root in iter_bundle_roots(include_executable_dir=include_exe_dir)
+ ]
- if include_exe_dir and getattr(sys, "frozen", False):
- candidates.append(os.path.join(os.path.dirname(sys.executable), filename))
-
- candidates.append(
- os.path.join(os.path.dirname(os.path.dirname(__file__)), "assets", filename)
- )
+ candidates.append(os.path.join(PROJECT_ROOT, "assets", filename))
candidates.append(os.path.abspath(filename))
for candidate in candidates:
diff --git a/src/ui/app_controller.py b/src/ui/app_controller.py
index 2ce96fd..9af78f7 100644
--- a/src/ui/app_controller.py
+++ b/src/ui/app_controller.py
@@ -13,6 +13,7 @@
from core.file_scanner import SUPPORTED_EXTENSIONS
from core.image_file_ops import ImageFileOperations
from core.ai.best_photo_selector import DEFAULT_MODELS_ROOT
+from core.pyexiv2_wrapper import PyExiv2Operations
logger = logging.getLogger(__name__)
@@ -45,6 +46,7 @@ def clear_application_caches():
from core.caching.preview_cache import PreviewCache
from core.caching.exif_cache import ExifCache
from core.caching.rating_cache import RatingCache
+ from core.caching.analysis_cache import AnalysisCache
cache_classes = (
("thumbnail", ThumbnailCache),
@@ -80,6 +82,22 @@ def clear_application_caches():
except Exception:
logger.error("Error clearing similarity cache.", exc_info=True)
+ analysis_cache_instance = None
+ try:
+ analysis_cache_instance = AnalysisCache()
+ analysis_cache_instance.clear_all()
+ except Exception:
+ logger.error("Error clearing analysis cache.", exc_info=True)
+ finally:
+ if analysis_cache_instance is not None:
+ try:
+ analysis_cache_instance.close()
+ except Exception:
+ logger.error(
+ "Error closing analysis cache after clearing.",
+ exc_info=True,
+ )
+
logger.info(
f"Application caches cleared in {time.perf_counter() - start_time:.2f}s."
)
@@ -659,12 +677,21 @@ def start_ai_rating_all(self):
)
return
+ image_paths_to_rate, already_rated_count = self._partition_unrated_images(
+ image_paths
+ )
+ if not image_paths_to_rate:
+ self.main_window.statusBar().showMessage(
+ "All images already have ratings.", 4000
+ )
+ return
+
self.main_window.show_loading_overlay("Requesting AI ratings...")
self.main_window.menu_manager.ai_rate_images_action.setEnabled(False)
- self.main_window.statusBar().showMessage(
- f"AI rating started for {len(image_paths)} image(s)...",
- 4000,
- )
+ status_message = f"AI rating started for {len(image_paths_to_rate)} image(s)..."
+ if already_rated_count:
+ status_message += f" ({already_rated_count} already-rated image(s) skipped)"
+ self.main_window.statusBar().showMessage(status_message, 4000)
self._ai_rating_warning_messages = []
@@ -675,7 +702,7 @@ def start_ai_rating_all(self):
engine = None
self.worker_manager.start_ai_rating(
- image_paths=image_paths,
+ image_paths=image_paths_to_rate,
models_root=DEFAULT_MODELS_ROOT,
engine=engine,
)
@@ -734,6 +761,87 @@ def _calculate_folder_image_size(self, folder_path: str) -> int:
)
return total_size_bytes
+ def _get_existing_rating_for_path(self, image_path: str) -> Optional[int]:
+ normalized_path = os.path.normpath(image_path)
+ cached_rating = self._get_cached_rating(normalized_path)
+ if cached_rating is not None:
+ return cached_rating
+
+ metadata_rating = self._read_metadata_rating(normalized_path)
+ if metadata_rating is None:
+ self._cache_missing_rating(normalized_path)
+ return None
+
+ rating_int = self._normalize_rating_value(metadata_rating, normalized_path)
+ if rating_int is None:
+ return None
+
+ self._cache_rating(normalized_path, rating_int)
+ return rating_int
+
+ def _get_cached_rating(self, normalized_path: str) -> Optional[int]:
+ cached_rating = self.app_state.rating_cache.get(normalized_path)
+ if cached_rating is not None:
+ return int(cached_rating)
+ disk_cache = getattr(self.app_state, "rating_disk_cache", None)
+ if disk_cache:
+ disk_rating = disk_cache.get(normalized_path)
+ if disk_rating is not None:
+ rating_int = int(disk_rating)
+ self.app_state.rating_cache[normalized_path] = rating_int
+ return rating_int
+ return None
+
+ def _read_metadata_rating(self, normalized_path: str) -> Optional[float]:
+ try:
+ return PyExiv2Operations.get_rating(normalized_path)
+ except Exception:
+ logger.debug(
+ "Failed to read rating metadata for %s",
+ normalized_path,
+ exc_info=True,
+ )
+ return None
+
+ def _normalize_rating_value(
+ self, metadata_rating: object, normalized_path: str
+ ) -> Optional[int]:
+ try:
+ rating_int = int(round(float(metadata_rating)))
+ except (TypeError, ValueError):
+ logger.debug(
+ "Unexpected rating value for %s: %s",
+ normalized_path,
+ metadata_rating,
+ )
+ return None
+ return max(0, min(5, rating_int))
+
+ def _cache_rating(self, normalized_path: str, rating: int) -> None:
+ self.app_state.rating_cache[normalized_path] = rating
+ disk_cache = getattr(self.app_state, "rating_disk_cache", None)
+ if disk_cache:
+ disk_cache.set(normalized_path, rating)
+
+ def _cache_missing_rating(self, normalized_path: str) -> None:
+ self.app_state.rating_cache.setdefault(normalized_path, 0)
+ disk_cache = getattr(self.app_state, "rating_disk_cache", None)
+ if disk_cache:
+ disk_cache.set(normalized_path, 0)
+
+ def _partition_unrated_images(
+ self, image_paths: List[str]
+ ) -> Tuple[List[str], int]:
+ unrated: List[str] = []
+ already_rated_count = 0
+ for path in image_paths:
+ existing_rating = self._get_existing_rating_for_path(path)
+ if existing_rating is not None and existing_rating != 0:
+ already_rated_count += 1
+ continue
+ unrated.append(path)
+ return unrated, already_rated_count
+
def _start_preview_preloader(self, image_data_list: List[Dict[str, any]]):
logger.info(f"Starting preview preloader for {len(image_data_list)} images.")
if not image_data_list:
@@ -969,7 +1077,10 @@ def handle_similarity_error(self, message):
self.main_window.hide_loading_overlay()
def handle_best_shot_progress(self, percentage: int, message: str):
- self.main_window.update_loading_text(f"Best shots: {message} ({percentage}%)")
+ suffix = (
+ f" ({percentage}%)" if percentage is not None and percentage >= 0 else ""
+ )
+ self.main_window.update_loading_text(f"Best shots: {message}{suffix}")
def handle_best_shot_complete(
self, rankings_by_cluster: Dict[int, List[Dict[str, Any]]]
@@ -1039,10 +1150,10 @@ def handle_best_shot_models_missing(self, missing_models: list):
self.main_window.menu_manager.stop_best_shots_action.setEnabled(False)
def handle_ai_rating_progress(self, percentage: int, message: str):
- progress_text = message
- if percentage is not None:
- progress_text = f"{message} ({percentage}%)"
- self.main_window.update_loading_text(f"AI rating: {progress_text}")
+ suffix = (
+ f" ({percentage}%)" if percentage is not None and percentage >= 0 else ""
+ )
+ self.main_window.update_loading_text(f"AI rating: {message}{suffix}")
def handle_ai_rating_warning(self, message: str):
logger.warning("AI rating warning: %s", message)
diff --git a/src/ui/dialog_manager.py b/src/ui/dialog_manager.py
index 253c274..e17be87 100644
--- a/src/ui/dialog_manager.py
+++ b/src/ui/dialog_manager.py
@@ -264,11 +264,21 @@ def _header_mouse_move(e):
worker_manager = self.parent.app_controller.worker_manager
if embeddings_label_ref:
- def update_embeddings_label(available):
+ def update_embeddings_label(device_name: str):
+ device_key = (device_name or "cpu").lower()
+ friendly = {
+ "cuda": "GPU (CUDA)",
+ "mps": "GPU (Apple MPS)",
+ "cpu": "CPU",
+ }
+ label_text = friendly.get(
+ device_key,
+ device_key.upper(),
+ )
try:
if embeddings_label_ref:
embeddings_label_ref.setText(
- f"🧠 Embeddings: SentenceTransformer (CLIP) on {'GPU (CUDA)' if available else 'CPU'}"
+ f"🧠 Embeddings: SentenceTransformer (CLIP) on {label_text}"
)
except RuntimeError:
pass # Label has been deleted
diff --git a/src/ui/ui_components.py b/src/ui/ui_components.py
index 06b2236..af59d90 100644
--- a/src/ui/ui_components.py
+++ b/src/ui/ui_components.py
@@ -479,14 +479,14 @@ def run(self):
# --- CUDA Detection Worker ---
class CudaDetectionWorker(QObject):
- finished = pyqtSignal(bool) # cuda_available
+ finished = pyqtSignal(str) # torch_device
def run(self):
- from core.app_settings import is_pytorch_cuda_available
+ from core.app_settings import get_preferred_torch_device
try:
- available = is_pytorch_cuda_available()
- self.finished.emit(available)
+ device = get_preferred_torch_device()
except Exception as e:
- logger.error(f"Error during CUDA detection: {e}", exc_info=True)
- self.finished.emit(False) # default to CPU on error
+ logger.error(f"Error during torch device detection: {e}", exc_info=True)
+ device = "cpu"
+ self.finished.emit(device)
diff --git a/src/ui/worker_manager.py b/src/ui/worker_manager.py
index 2c3bda1..fb36338 100644
--- a/src/ui/worker_manager.py
+++ b/src/ui/worker_manager.py
@@ -88,7 +88,7 @@ class WorkerManager(QObject):
rotation_model_not_found = pyqtSignal(str) # model_path
# CUDA Detection Signals
- cuda_detection_finished = pyqtSignal(bool)
+ cuda_detection_finished = pyqtSignal(str)
# Update Check Signals
update_check_finished = pyqtSignal(
diff --git a/src/workers/ai_rating_worker.py b/src/workers/ai_rating_worker.py
index fabdf80..ba3871b 100644
--- a/src/workers/ai_rating_worker.py
+++ b/src/workers/ai_rating_worker.py
@@ -1,4 +1,5 @@
import logging
+import math
import os
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -16,10 +17,28 @@
create_best_shot_strategy,
)
from core.app_settings import calculate_max_workers, get_best_shot_engine
+from core.utils.time_utils import format_duration
logger = logging.getLogger(__name__)
+def _format_eta_suffix(processed: int, total: int, start_time: Optional[float]) -> str:
+ if start_time is None or processed <= 0 or total <= 0 or processed > total:
+ return ""
+ remaining = total - processed
+ if remaining <= 0:
+ return "ETA 0s"
+ elapsed = time.perf_counter() - start_time
+ if elapsed <= 0:
+ return ""
+ per_item = elapsed / processed
+ eta_seconds = per_item * remaining
+ if not math.isfinite(eta_seconds) or eta_seconds < 0:
+ return ""
+ eta_text = format_duration(eta_seconds)
+ return f"ETA {eta_text}" if eta_text else ""
+
+
class AiRatingWorker(QObject):
"""Background worker that requests AI ratings (1-5) for images."""
@@ -59,6 +78,13 @@ def __init__(
def stop(self) -> None:
self._should_stop = True
+ def _emit_status_message(self, message: str) -> None:
+ logger.info("AI rating status: %s", message)
+ try:
+ self.progress_update.emit(-1, message)
+ except Exception:
+ logger.debug("Failed to emit AI rating status", exc_info=True)
+
def _ensure_strategy(self) -> None:
if self._strategy is None:
self._strategy = create_best_shot_strategy(
@@ -66,6 +92,7 @@ def _ensure_strategy(self) -> None:
models_root=self.models_root,
image_pipeline=self._image_pipeline,
llm_config=self._llm_config,
+ status_callback=self._emit_status_message,
)
if self._strategy.max_workers:
self._max_workers = min(
@@ -152,6 +179,7 @@ def run(self) -> None:
}
processed = 0
+ start_time = time.perf_counter()
for future in as_completed(futures):
if self._should_stop:
logger.info(
@@ -187,7 +215,11 @@ def run(self) -> None:
results[path] = rating_data
processed += 1
percent = int((processed / total) * 100)
- self.progress_update.emit(percent, f"Rated {processed}/{total}")
+ eta_suffix = _format_eta_suffix(processed, total, start_time)
+ message = f"Rated {processed}/{total}"
+ if eta_suffix:
+ message = f"{message} - {eta_suffix}"
+ self.progress_update.emit(percent, message)
if not self._should_stop:
logger.info(
diff --git a/tests/test_best_photo_selector.py b/tests/test_best_photo_selector.py
index 8a13e9b..d6a8683 100644
--- a/tests/test_best_photo_selector.py
+++ b/tests/test_best_photo_selector.py
@@ -1,151 +1,197 @@
from __future__ import annotations
-import numpy as np
-from PIL import Image
+import os
+from pathlib import Path
+from typing import Dict
+from urllib.parse import urlparse
-from core.ai.best_photo_selector import (
- BestPhotoSelector,
- FaceDetectionResult,
- QualityScore,
- _default_focus_score,
-)
-
-
-def _make_detection() -> FaceDetectionResult:
- return FaceDetectionResult(
- score=0.92,
- bbox=(10, 10, 90, 90),
- bbox_normalized=(0.1, 0.1, 0.9, 0.9),
- keypoints=[
- (0.3, 0.4),
- (0.7, 0.4),
- (0.5, 0.55),
- (0.5, 0.65),
- (0.25, 0.5),
- (0.75, 0.5),
- ],
- image_size=(100, 100),
- )
+from PIL import Image
+from core.ai.best_photo_selector import BestPhotoSelector, MetricSpec
-class DummyFaceDetector:
- def __init__(self, mapping):
- self.mapping = mapping
- def detect_faces(self, image, image_path=None, max_faces=None):
- return list(self.mapping.get(image_path, []))
+def _loader_factory():
+ def _loader(image_path: str) -> Image.Image:
+ img = Image.new("RGB", (32, 32), color="white")
+ img.info["source_path"] = image_path
+ return img
+ return _loader
-class DummyEyeClassifier:
- def __init__(self, mapping):
- self.mapping = mapping
- def predict_open_probability(self, eye_image, image_path=None):
- path = image_path or eye_image.info.get("source_path")
- return float(self.mapping.get(path, 0.5))
+def _scorer(scores: Dict[str, float]):
+ def _score(image: Image.Image) -> float:
+ path = image.info["source_path"]
+ return scores[path]
+ return _score
-class DummyQualityModel:
- def __init__(self, full_scores, face_scores):
- self.full_scores = full_scores
- self.face_scores = face_scores
- def score(self, image, return_embedding=False):
- path = image.info.get("source_path")
- region = image.info.get("region", "full")
- table = self.face_scores if region == "face" else self.full_scores
- result = table[path]
- return result
+def test_selector_ranks_images_by_weighted_iqa(tmp_path):
+ img_a = str(tmp_path / "a.jpg")
+ img_b = str(tmp_path / "b.jpg")
+ metric_specs = (
+ MetricSpec(name="musiq", weight=0.6, min_score=0.0, max_score=100.0),
+ MetricSpec(name="maniqa", weight=0.4, min_score=0.0, max_score=1.0),
+ )
+ metric_factories = {
+ "musiq": _scorer({img_a: 82.0, img_b: 78.0}),
+ "maniqa": _scorer({img_a: 0.85, img_b: 0.35}),
+ }
-def _loader_factory():
- def _loader(image_path: str) -> Image.Image:
- img = Image.new("RGB", (100, 100), color="white")
- img.info["source_path"] = image_path
- img.info["region"] = "full"
- return img
+ selector = BestPhotoSelector(
+ metric_specs=metric_specs,
+ metric_factories=metric_factories,
+ image_loader=_loader_factory(),
+ enable_eye_detection=False,
+ )
- return _loader
+ results = selector.rank_images([img_b, img_a])
+ assert [r.image_path for r in results] == [img_a, img_b]
+ assert results[0].metrics["musiq"] > results[1].metrics["musiq"]
+ assert results[0].metrics["maniqa"] > results[1].metrics["maniqa"]
-def test_selector_prefers_open_eyes_and_subject_focus(tmp_path):
+def test_selector_clamps_scores_outside_known_range(tmp_path):
img_a = str(tmp_path / "a.jpg")
img_b = str(tmp_path / "b.jpg")
- tmp_path.joinpath("a.jpg").write_text("a")
- tmp_path.joinpath("b.jpg").write_text("b")
-
- face_detector = DummyFaceDetector({img_a: [_make_detection()]})
- eye_classifier = DummyEyeClassifier({img_a: 0.9})
- full_scores = {
- img_a: QualityScore(raw=8.5, normalized=0.83, embedding=np.array([1.0, 0.0])),
- img_b: QualityScore(raw=7.0, normalized=0.66, embedding=np.array([0.0, 1.0])),
- }
- face_scores = {
- img_a: QualityScore(raw=7.2, normalized=0.7, embedding=np.array([0.9, 0.1])),
+ metric_specs = (
+ MetricSpec(name="liqe", weight=1.0, min_score=0.0, max_score=100.0),
+ )
+ metric_factories = {
+ "liqe": _scorer({img_a: 150.0, img_b: -10.0}),
}
- def focus_metric(image: Image.Image) -> float:
- region = image.info.get("region", "full")
- path = image.info.get("source_path")
- if region == "face" and path == img_a:
- return 0.8
- if region == "full" and path == img_a:
- return 0.75
- return 0.55
-
selector = BestPhotoSelector(
- face_detector=face_detector,
- eye_classifier=eye_classifier,
- quality_model=DummyQualityModel(full_scores, face_scores),
+ metric_specs=metric_specs,
+ metric_factories=metric_factories,
image_loader=_loader_factory(),
- focus_metric_fn=focus_metric,
+ enable_eye_detection=False,
)
results = selector.rank_images([img_b, img_a])
- assert [r.image_path for r in results] == [img_a, img_b]
- assert results[0].metrics["eyes_open"] == 0.9
- assert "framing" in results[0].metrics
- assert "eyes_open" not in results[1].metrics
+ assert results[0].metrics["liqe"] == 1.0 # clamped upper bound
+ assert results[1].metrics["liqe"] == 0.0 # clamped lower bound
-def test_selector_handles_images_without_faces(tmp_path):
+def test_selector_handles_partial_metric_failures(tmp_path):
img_a = str(tmp_path / "a.jpg")
img_b = str(tmp_path / "b.jpg")
- tmp_path.joinpath("a.jpg").write_text("a")
- tmp_path.joinpath("b.jpg").write_text("b")
- full_scores = {
- img_a: QualityScore(raw=7.5, normalized=0.7, embedding=None),
- img_b: QualityScore(raw=6.5, normalized=0.6, embedding=None),
- }
+ musiq_scores = {img_a: 75.0, img_b: 80.0}
- def focus_metric(image: Image.Image) -> float:
- return 0.8 if image.info.get("source_path") == img_a else 0.5
+ def flaky_maniqa(image: Image.Image) -> float:
+ if image.info["source_path"] == img_b:
+ raise RuntimeError("simulated metric failure")
+ return 0.9
selector = BestPhotoSelector(
- face_detector=DummyFaceDetector({}),
- eye_classifier=DummyEyeClassifier({}),
- quality_model=DummyQualityModel(full_scores, {}),
+ metric_specs=(
+ MetricSpec(name="musiq", weight=0.5, min_score=0.0, max_score=100.0),
+ MetricSpec(name="maniqa", weight=0.5, min_score=0.0, max_score=1.0),
+ ),
+ metric_factories={
+ "musiq": _scorer(musiq_scores),
+ "maniqa": flaky_maniqa,
+ },
image_loader=_loader_factory(),
- focus_metric_fn=focus_metric,
+ enable_eye_detection=False,
)
results = selector.rank_images([img_b, img_a])
- assert [r.image_path for r in results] == [img_a, img_b]
+ assert len(results) == 2
+ maniqa_present = {
+ result.image_path: ("maniqa" in result.metrics) for result in results
+ }
+ assert maniqa_present[img_a] is True
+ assert maniqa_present[img_b] is False
+ musiq_metrics = {result.image_path: result.metrics["musiq"] for result in results}
+ assert musiq_metrics[img_a] == 0.75
+ assert musiq_metrics[img_b] == 0.8
+
+
+def test_selector_notifies_weight_download(monkeypatch, tmp_path):
+ import torch
+ import pyiqa # type: ignore
+ import pyiqa.utils.download_util as download_util # type: ignore
+
+ messages: list[str] = []
+
+ def status_cb(message: str) -> None:
+ messages.append(message)
+
+ cache_dir = tmp_path / "pyiqa"
+ monkeypatch.setattr(
+ download_util, "DEFAULT_CACHE_DIR", str(cache_dir), raising=False
+ )
+
+ def fake_loader(url, model_dir=None, progress=True, file_name=None):
+ target_dir = model_dir or str(cache_dir)
+ os.makedirs(target_dir, exist_ok=True)
+ filename = file_name or os.path.basename(urlparse(url).path)
+ destination = os.path.join(target_dir, filename)
+ Path(destination).write_bytes(b"weights")
+ return destination
+
+ monkeypatch.setattr(download_util, "load_file_from_url", fake_loader)
+
+ class DummyMetric:
+ def eval(self):
+ return self
+
+ def __call__(self, tensor):
+ return torch.tensor([0.5])
+
+ def fake_create_metric(*_, **__):
+ download_util.load_file_from_url(
+ "https://example.com/musiq_koniq_ckpt-e95806b9.pth"
+ )
+ return DummyMetric()
- result = results[0]
- assert result.metrics["technical"] == 0.8
- assert "eyes_open" not in result.metrics
- assert "framing" not in result.metrics
+ monkeypatch.setattr(pyiqa, "create_metric", fake_create_metric)
- # Only aesthetic + technical contribute (equal weight in normalization)
- expected = (0.7 + 0.8) / 2.0
- assert abs(result.composite_score - expected) < 1e-6
+ selector = BestPhotoSelector(
+ image_loader=_loader_factory(),
+ metric_specs=(
+ MetricSpec(name="musiq", weight=1.0, min_score=0.0, max_score=1.0),
+ ),
+ metric_factories={},
+ status_callback=status_cb,
+ enable_eye_detection=False,
+ )
+
+ img_path = str(tmp_path / "a.jpg")
+ results = selector.rank_images([img_path])
+ assert results
+ assert any("Downloading MUSIQ" in msg for msg in messages)
+ assert any("MUSIQ weights cached" in msg for msg in messages)
+
+
+def test_eye_open_probability_influences_ranking(tmp_path):
+ img_a = str(tmp_path / "closed.jpg")
+ img_b = str(tmp_path / "open.jpg")
+
+ class EyeStub:
+ def __init__(self, mapping):
+ self.mapping = mapping
+ def predict_open_probability(self, image: Image.Image):
+ return self.mapping.get(image.info["source_path"], 0.5)
-def test_default_focus_score_handles_uint16_image():
- data = np.random.randint(0, 65535, (12, 12), dtype=np.uint16)
- img = Image.fromarray(data, mode="I;16")
- score = _default_focus_score(img)
- assert 0.0 <= score <= 1.0
+ metric_specs = (
+ MetricSpec(name="musiq", weight=1.0, min_score=0.0, max_score=100.0),
+ )
+ constant_scores = {img_a: 60.0, img_b: 60.0}
+
+ selector = BestPhotoSelector(
+ metric_specs=metric_specs,
+ metric_factories={"musiq": _scorer(constant_scores)},
+ image_loader=_loader_factory(),
+ eye_state_analyzer=EyeStub({img_a: 0.1, img_b: 0.9}),
+ )
+
+ results = selector.rank_images([img_a, img_b])
+ assert [r.image_path for r in results] == [img_b, img_a]
+ assert results[0].metrics["eyes_open"] == 0.9
diff --git a/tests/test_best_shot_analysis_profile.py b/tests/test_best_shot_analysis_profile.py
new file mode 100644
index 0000000..0ee88bc
--- /dev/null
+++ b/tests/test_best_shot_analysis_profile.py
@@ -0,0 +1,37 @@
+from core.ai.best_photo_selector import DEFAULT_METRIC_SPECS
+from core.ai.best_shot_pipeline import (
+ MAX_LOCAL_ANALYSIS_EDGE,
+ RESPONSIVE_LOCAL_ANALYSIS_EDGE,
+ select_local_analysis_profile,
+)
+from core.app_settings import PerformanceMode
+
+
+def _metric_names(profile) -> tuple[str, ...]:
+ return tuple(spec.name for spec in profile.metric_specs)
+
+
+def test_balanced_mode_keeps_full_quality_stack():
+ profile = select_local_analysis_profile(PerformanceMode.BALANCED)
+ assert profile.max_edge == MAX_LOCAL_ANALYSIS_EDGE
+ expected = tuple(spec.name for spec in DEFAULT_METRIC_SPECS)
+ assert _metric_names(profile) == expected
+
+
+def test_performance_mode_keeps_full_quality_stack():
+ profile = select_local_analysis_profile(PerformanceMode.PERFORMANCE)
+ assert profile.max_edge == MAX_LOCAL_ANALYSIS_EDGE
+ expected = tuple(spec.name for spec in DEFAULT_METRIC_SPECS)
+ assert _metric_names(profile) == expected
+
+
+def test_custom_mode_uses_ratio_threshold():
+ high_ratio_profile = select_local_analysis_profile(
+ PerformanceMode.CUSTOM, custom_thread_ratio=0.99
+ )
+ assert high_ratio_profile.max_edge == MAX_LOCAL_ANALYSIS_EDGE
+
+ low_ratio_profile = select_local_analysis_profile(
+ PerformanceMode.CUSTOM, custom_thread_ratio=0.5
+ )
+ assert low_ratio_profile.max_edge == RESPONSIVE_LOCAL_ANALYSIS_EDGE
diff --git a/tests/test_best_shot_model_checker.py b/tests/test_best_shot_model_checker.py
index bdc521f..acbc4bf 100644
--- a/tests/test_best_shot_model_checker.py
+++ b/tests/test_best_shot_model_checker.py
@@ -1,9 +1,7 @@
-"""Tests for best-shot model dependency checker."""
+"""Tests for the best-shot dependency checker."""
from __future__ import annotations
-import os
-
import pytest
from core.ai.model_checker import (
@@ -14,221 +12,74 @@
)
-@pytest.fixture
-def temp_models_root(tmp_path):
- """Create a temporary models directory."""
- models_dir = tmp_path / "models"
- models_dir.mkdir()
- return str(models_dir)
-
-
-def test_check_all_models_present(temp_models_root):
- """Test that check passes when all models are present."""
- import numpy as np
-
- # Create all required model directories and files
- face_dir = os.path.join(temp_models_root, "job_jgzjewkop_optimized_onnx")
- os.makedirs(face_dir)
- with open(os.path.join(face_dir, "model.onnx"), "w") as f:
- f.write("fake model")
-
- eye_dir = os.path.join(temp_models_root, "open-closed-eye-classification-mobilev2")
- os.makedirs(eye_dir)
-
- aesthetic_dir = os.path.join(temp_models_root, "aesthetic_predictor")
- os.makedirs(aesthetic_dir)
-
- # Create anchors in the models root
- np.save(
- os.path.join(temp_models_root, "blazeface_anchors.npy"), np.array([[1, 2, 3]])
- )
-
- missing = check_best_shot_models(temp_models_root)
- assert len(missing) == 0
-
-
-def test_check_all_models_missing(temp_models_root):
- """Test that all models are reported missing when none are present."""
- missing = check_best_shot_models(temp_models_root)
-
- # Should find 4 missing models (face, eye, aesthetic, anchors)
- assert len(missing) >= 3 # At least 3, anchors might be bundled
- model_names = {m.name for m in missing}
- assert "Face Detector" in model_names
- assert "Eye Classifier" in model_names
- assert "Aesthetic Predictor" in model_names
-
-
-def test_check_face_detector_missing(temp_models_root):
- """Test face detector missing detection."""
- import numpy as np
-
- # Create only eye classifier and aesthetic predictor
- eye_dir = os.path.join(temp_models_root, "open-closed-eye-classification-mobilev2")
- os.makedirs(eye_dir)
-
- aesthetic_dir = os.path.join(temp_models_root, "aesthetic_predictor")
- os.makedirs(aesthetic_dir)
-
- # Create anchors
- np.save(
- os.path.join(temp_models_root, "blazeface_anchors.npy"), np.array([[1, 2, 3]])
+def test_check_best_shot_models_all_present(monkeypatch):
+ monkeypatch.setattr(
+ "core.ai.model_checker._module_available",
+ lambda name: True,
)
- missing = check_best_shot_models(temp_models_root)
+ missing = check_best_shot_models()
- assert len(missing) == 1
- assert missing[0].name == "Face Detector"
- assert "qualcomm/MediaPipe-Face-Detection" in missing[0].download_url
-
-
-def test_check_eye_classifier_missing(temp_models_root):
- """Test eye classifier missing detection."""
- import numpy as np
+ assert missing == []
- # Create only face detector and aesthetic predictor
- face_dir = os.path.join(temp_models_root, "job_jgzjewkop_optimized_onnx")
- os.makedirs(face_dir)
- with open(os.path.join(face_dir, "model.onnx"), "w") as f:
- f.write("fake model")
- aesthetic_dir = os.path.join(temp_models_root, "aesthetic_predictor")
- os.makedirs(aesthetic_dir)
+def test_check_best_shot_models_missing_pyiqa(monkeypatch):
+ def fake_availability(name: str) -> bool:
+ return name != "pyiqa"
- # Create anchors
- np.save(
- os.path.join(temp_models_root, "blazeface_anchors.npy"), np.array([[1, 2, 3]])
+ monkeypatch.setattr(
+ "core.ai.model_checker._module_available",
+ fake_availability,
)
- missing = check_best_shot_models(temp_models_root)
+ missing = check_best_shot_models()
assert len(missing) == 1
- assert missing[0].name == "Eye Classifier"
- assert "MichalMlodawski" in missing[0].download_url
-
-
-def test_check_aesthetic_predictor_missing(temp_models_root):
- """Test aesthetic predictor missing detection."""
- import numpy as np
+ assert "pyiqa" in missing[0].name.lower()
- # Create only face detector and eye classifier
- face_dir = os.path.join(temp_models_root, "job_jgzjewkop_optimized_onnx")
- os.makedirs(face_dir)
- with open(os.path.join(face_dir, "model.onnx"), "w") as f:
- f.write("fake model")
- eye_dir = os.path.join(temp_models_root, "open-closed-eye-classification-mobilev2")
- os.makedirs(eye_dir)
-
- # Create anchors
- np.save(
- os.path.join(temp_models_root, "blazeface_anchors.npy"), np.array([[1, 2, 3]])
+def test_ensure_best_shot_models_raises(monkeypatch):
+ monkeypatch.setattr(
+ "core.ai.model_checker._module_available",
+ lambda name: False,
)
- missing = check_best_shot_models(temp_models_root)
-
- assert len(missing) == 1
- assert missing[0].name == "Aesthetic Predictor"
- assert "shunk031" in missing[0].download_url
-
-
-def test_ensure_models_raises_on_missing(temp_models_root):
- """Test that ensure_best_shot_models raises ModelDependencyError."""
with pytest.raises(ModelDependencyError) as excinfo:
- ensure_best_shot_models(temp_models_root)
-
- assert len(excinfo.value.missing_models) >= 3
- assert "Face Detector" in str(excinfo.value)
-
-
-def test_ensure_models_passes_when_present(temp_models_root):
- """Test that ensure_best_shot_models doesn't raise when all models present."""
- import numpy as np
-
- # Create all required model directories and files
- face_dir = os.path.join(temp_models_root, "job_jgzjewkop_optimized_onnx")
- os.makedirs(face_dir)
- with open(os.path.join(face_dir, "model.onnx"), "w") as f:
- f.write("fake model")
-
- eye_dir = os.path.join(temp_models_root, "open-closed-eye-classification-mobilev2")
- os.makedirs(eye_dir)
-
- aesthetic_dir = os.path.join(temp_models_root, "aesthetic_predictor")
- os.makedirs(aesthetic_dir)
+ ensure_best_shot_models()
- # Create anchors
- np.save(
- os.path.join(temp_models_root, "blazeface_anchors.npy"), np.array([[1, 2, 3]])
- )
-
- # Should not raise
- ensure_best_shot_models(temp_models_root)
+ assert excinfo.value.missing_models
+ assert "Required best-shot dependencies not found" in str(excinfo.value)
def test_missing_model_info_structure():
- """Test the structure of MissingModelInfo."""
info = MissingModelInfo(
- name="Test Model",
- description="A test model",
- expected_path="/path/to/model",
+ name="Dependency",
+ description="A dependency",
+ expected_path="pip install something",
download_url="https://example.com",
)
- assert info.name == "Test Model"
- assert info.description == "A test model"
- assert info.expected_path == "/path/to/model"
- assert info.download_url == "https://example.com"
+ assert info.name == "Dependency"
+ assert "dependency" in info.description.lower()
def test_model_dependency_error_message():
- """Test that ModelDependencyError formats message correctly."""
missing = [
MissingModelInfo(
- name="Model A",
- description="First model",
- expected_path="/path/a",
- download_url="https://a.com",
+ name="torch",
+ description="",
+ expected_path="",
+ download_url="",
),
MissingModelInfo(
- name="Model B",
- description="Second model",
- expected_path="/path/b",
- download_url="https://b.com",
+ name="pyiqa",
+ description="",
+ expected_path="",
+ download_url="",
),
]
error = ModelDependencyError(missing)
- assert "Model A" in str(error)
- assert "Model B" in str(error)
- assert len(error.missing_models) == 2
-
-
-def test_alternative_face_detector_path(temp_models_root):
- """Test that alternative face detector path is recognized."""
- import numpy as np
-
- # Create face detector in alternative location
- face_dir = os.path.join(
- temp_models_root, "MediaPipe-Face-Detection_FaceDetector_float"
- )
- os.makedirs(face_dir)
- with open(os.path.join(face_dir, "model.onnx"), "w") as f:
- f.write("fake model")
-
- eye_dir = os.path.join(temp_models_root, "open-closed-eye-classification-mobilev2")
- os.makedirs(eye_dir)
-
- aesthetic_dir = os.path.join(temp_models_root, "aesthetic_predictor")
- os.makedirs(aesthetic_dir)
-
- # Create anchors
- np.save(
- os.path.join(temp_models_root, "blazeface_anchors.npy"), np.array([[1, 2, 3]])
- )
-
- missing = check_best_shot_models(temp_models_root)
- # Should not report face detector as missing
- model_names = {m.name for m in missing}
- assert "Face Detector" not in model_names
+ assert "torch" in str(error)
+ assert "pyiqa" in str(error)
diff --git a/tests/test_best_shot_rating_scale.py b/tests/test_best_shot_rating_scale.py
new file mode 100644
index 0000000..3535cd6
--- /dev/null
+++ b/tests/test_best_shot_rating_scale.py
@@ -0,0 +1,35 @@
+from core.ai.best_photo_selector import BestShotResult
+from core.ai.best_shot_pipeline import _compute_quality_rating
+
+
+def _make_result(
+ musiq: float, maniqa: float, liqe: float, composite: float
+) -> BestShotResult:
+ return BestShotResult(
+ image_path="dummy.jpg",
+ composite_score=composite,
+ metrics={},
+ raw_metrics={
+ "musiq_raw": musiq,
+ "maniqa_raw": maniqa,
+ "liqe_raw": liqe,
+ },
+ )
+
+
+def test_quality_rating_spreads_scores():
+ poor = _make_result(20.0, 0.2, 25.0, 0.2)
+ rich = _make_result(85.0, 0.9, 90.0, 0.9)
+
+ poor_rating, poor_score = _compute_quality_rating(poor)
+ rich_rating, rich_score = _compute_quality_rating(rich)
+
+ assert poor_rating <= 2
+ assert rich_rating == 5
+ assert poor_score < 0.3 < rich_score
+
+
+def test_mid_quality_maps_to_four():
+ mid = _make_result(55.0, 0.45, 60.0, 0.5)
+ rating, _ = _compute_quality_rating(mid)
+ assert rating == 4
diff --git a/tests/test_clear_application_caches.py b/tests/test_clear_application_caches.py
new file mode 100644
index 0000000..205d6cb
--- /dev/null
+++ b/tests/test_clear_application_caches.py
@@ -0,0 +1,61 @@
+from ui.app_controller import AppController
+
+
+def _make_fake_cache(label: str, calls: list[str]):
+ class _Cache:
+ def __init__(self):
+ calls.append(f"{label}_init")
+
+ def clear(self):
+ calls.append(f"{label}_clear")
+
+ def close(self):
+ calls.append(f"{label}_close")
+
+ return _Cache
+
+
+def test_clear_application_caches_clears_every_cache(monkeypatch):
+ calls: list[str] = []
+
+ for module_path, class_name in (
+ ("core.caching.thumbnail_cache", "ThumbnailCache"),
+ ("core.caching.preview_cache", "PreviewCache"),
+ ("core.caching.exif_cache", "ExifCache"),
+ ("core.caching.rating_cache", "RatingCache"),
+ ):
+ label = class_name.replace("Cache", "").lower()
+ fake_cls = _make_fake_cache(label, calls)
+ monkeypatch.setattr(f"{module_path}.{class_name}", fake_cls)
+
+ class FakeAnalysisCache:
+ def __init__(self):
+ calls.append("analysis_init")
+
+ def clear_all(self):
+ calls.append("analysis_clear_all")
+
+ def close(self):
+ calls.append("analysis_close")
+
+ monkeypatch.setattr(
+ "core.caching.analysis_cache.AnalysisCache",
+ FakeAnalysisCache,
+ )
+
+ def fake_clear_embedding_cache():
+ calls.append("similarity_clear_embeddings")
+
+ monkeypatch.setattr(
+ "core.similarity_engine.SimilarityEngine.clear_embedding_cache",
+ staticmethod(fake_clear_embedding_cache),
+ )
+
+ AppController.clear_application_caches()
+
+ assert "analysis_clear_all" in calls
+ assert "similarity_clear_embeddings" in calls
+
+ for cache_name in ("thumbnail", "preview", "exif", "rating"):
+ assert f"{cache_name}_clear" in calls
+ assert f"{cache_name}_close" in calls
diff --git a/tests/test_similarity_engine_helpers.py b/tests/test_similarity_engine_helpers.py
new file mode 100644
index 0000000..cf45c60
--- /dev/null
+++ b/tests/test_similarity_engine_helpers.py
@@ -0,0 +1,60 @@
+import numpy as np
+import pytest
+
+pytest.importorskip("sklearn")
+
+from core.similarity_utils import (
+ adaptive_dbscan_eps,
+ l2_normalize_rows,
+ normalize_embedding_vector,
+)
+
+
+def test_l2_normalize_rows_produces_unit_norm_rows():
+ data = np.array([[3.0, 4.0], [1.0, 1.0], [0.0, 0.0]], dtype=np.float32)
+ normalized = l2_normalize_rows(data.copy())
+ norms = np.linalg.norm(normalized[:2], axis=1)
+ assert np.allclose(norms, np.ones_like(norms), atol=1e-6)
+ # Zero vector remains zero after normalization
+ assert np.allclose(normalized[2], np.zeros_like(normalized[2]))
+
+
+def test_normalize_embedding_vector_flags_updates():
+ vec = [2.0, 0.0]
+ normalized, changed = normalize_embedding_vector(vec)
+ assert changed is True
+ assert np.allclose(np.linalg.norm(normalized), 1.0, atol=1e-6)
+
+ already_unit = [1.0, 0.0]
+ normalized_same, changed_same = normalize_embedding_vector(already_unit)
+ assert changed_same is False
+ assert normalized_same == already_unit
+
+
+def test_adaptive_eps_distinguishes_dense_and_sparse_sets():
+ dense = np.vstack(
+ [np.ones(8, dtype=np.float32), np.ones(8, dtype=np.float32) * 1.01]
+ )
+ dense = l2_normalize_rows(dense)
+ sparse = np.eye(8, dtype=np.float32)
+ sparse = l2_normalize_rows(sparse)
+ base_eps = 0.05
+ dense_eps = adaptive_dbscan_eps(dense, base_eps, min_samples=2)
+ sparse_eps = adaptive_dbscan_eps(sparse, base_eps, min_samples=2)
+
+ assert 0.005 <= dense_eps <= 0.3
+ assert 0.005 <= sparse_eps <= 0.3
+ assert dense_eps <= sparse_eps
+
+
+def test_adaptive_eps_respects_min_samples_neighbor():
+ rng = np.random.default_rng(0)
+ cluster_a = rng.normal(scale=1e-3, size=(3, 8)).astype(np.float32)
+ cluster_a[:, 0] += 1.0
+ cluster_b = rng.normal(scale=1e-3, size=(3, 8)).astype(np.float32)
+ cluster_b[:, 1] += 1.0
+ data = np.vstack([cluster_a, cluster_b])
+ data = l2_normalize_rows(data)
+ base_eps = 0.05
+ eps = adaptive_dbscan_eps(data, base_eps, min_samples=3)
+ assert eps < 0.2
diff --git a/workers/best_shot_worker.py b/workers/best_shot_worker.py
index b043654..0822a2a 100644
--- a/workers/best_shot_worker.py
+++ b/workers/best_shot_worker.py
@@ -1,5 +1,7 @@
import logging
+import math
import os
+import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Dict, Iterable, List, Optional, Sequence, TYPE_CHECKING
@@ -20,10 +22,40 @@
get_best_shot_engine,
get_best_shot_batch_size,
)
+from core.utils.time_utils import format_duration
logger = logging.getLogger(__name__)
+def _estimate_eta_seconds(
+ processed: int, total: int, start_time: Optional[float]
+) -> Optional[float]:
+ if start_time is None or processed <= 0 or total <= 0 or processed > total:
+ return None
+ remaining = total - processed
+ if remaining <= 0:
+ return 0.0
+ elapsed = time.perf_counter() - start_time
+ if elapsed <= 0:
+ return None
+ per_item = elapsed / processed
+ eta = per_item * remaining
+ return eta if math.isfinite(eta) and eta >= 0 else None
+
+
+def _build_progress_detail(
+ processed: int, total: int, start_time: Optional[float]
+) -> str:
+ eta_seconds = _estimate_eta_seconds(processed, total, start_time)
+ base = f"{processed}/{total} done"
+ if eta_seconds is None:
+ return base
+ eta_text = format_duration(eta_seconds)
+ if not eta_text:
+ return base
+ return f"{base}, ETA {eta_text}"
+
+
class BestShotWorker(QObject):
"""Background worker that ranks images per similarity cluster."""
@@ -76,6 +108,13 @@ def _normalize_detail(exc: Exception) -> str:
message = str(exc).strip()
return message or exc.__class__.__name__
+ def _emit_status_message(self, message: str) -> None:
+ logger.info("Best-shot status: %s", message)
+ try:
+ self.progress_update.emit(-1, message)
+ except Exception:
+ logger.debug("Failed to emit status message", exc_info=True)
+
@staticmethod
def _looks_like_connectivity_issue(message: str) -> bool:
lowered = message.lower()
@@ -146,6 +185,7 @@ def _ensure_strategy(self):
models_root=self.models_root,
image_pipeline=self._image_pipeline,
llm_config=self._llm_config,
+ status_callback=self._emit_status_message,
)
if self._strategy.max_workers:
self._max_workers = min(
@@ -380,6 +420,7 @@ def run(self):
return
processed = 0
+ start_time = time.perf_counter()
for future in as_completed(futures):
if self._should_stop:
logger.info(
@@ -411,10 +452,16 @@ def run(self):
if cluster_results
else "No result"
)
- self.progress_update.emit(
- percent,
- f"Cluster {cluster_id}: best candidate {os.path.basename(best_path)}",
+ progress_detail = _build_progress_detail(
+ processed,
+ total_jobs,
+ start_time,
+ )
+ progress_message = (
+ f"Cluster {cluster_id}: best candidate {os.path.basename(best_path)}"
+ f" - {progress_detail}"
)
+ self.progress_update.emit(percent, progress_message)
if not self._should_stop:
total_results = sum(len(results) for results in results.values())