-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathfactory.py
More file actions
463 lines (395 loc) · 17.6 KB
/
factory.py
File metadata and controls
463 lines (395 loc) · 17.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
"""Backend discovery and construction utilities."""
# dlclivegui/cameras/factory.py
from __future__ import annotations
import importlib
import logging
import pkgutil
from collections.abc import Callable, Iterable
from contextlib import contextmanager
from dataclasses import dataclass
from os import environ
from ..config import CameraSettings
from .base import _BACKEND_REGISTRY, DEFAULT_CAPABILITIES, CameraBackend, SupportLevel
logger = logging.getLogger(__name__)
_BACKEND_IMPORT_ERRORS: dict[str, str] = {}
@dataclass
class DetectedCamera:
"""Information about a camera discovered during probing."""
index: int
label: str
# --- Optional but nice for quick, robust discovery
device_id: str | None = None
vid: int | None = None
pid: int | None = None
path: str | None = None
backend_hint: int | None = None # e.g. cv2.CAP_DSHOW (backend-specific)
def _opencv_get_log_level(cv2):
"""Return OpenCV log level using new utils.logging API when available, else legacy."""
# Preferred (OpenCV ≥ 4.x): cv2.utils.logging.getLogLevel()
try:
return cv2.utils.logging.getLogLevel()
except Exception:
# Legacy (older OpenCV): cv2.getLogLevel()
try:
return cv2.getLogLevel()
except Exception:
return None # unknown / not supported
def _opencv_set_log_level(cv2, level: int):
"""Set OpenCV log level using new utils.logging API when available, else legacy."""
# Preferred (OpenCV ≥ 4.x): cv2.utils.logging.setLogLevel(level)
try:
cv2.utils.logging.setLogLevel(level)
return
except Exception:
# Legacy (older OpenCV): cv2.setLogLevel(level)
try:
cv2.setLogLevel(level)
except Exception:
pass # not supported on this build
@contextmanager
def _suppress_opencv_logging():
"""Temporarily suppress OpenCV logging during camera probing (backwards compatible)."""
try:
import cv2
# Resolve a 'silent' level cross-version.
# In newer OpenCV it's 0 (LOG_LEVEL_SILENT).
SILENT = 0
old_level = _opencv_get_log_level(cv2)
_opencv_set_log_level(cv2, SILENT)
try:
yield
finally:
# Restore if we were able to read it
if old_level is not None:
_opencv_set_log_level(cv2, int(old_level))
except ImportError:
# OpenCV not installed; nothing to suppress
yield
# Lazy loader for backends (ensures @register_backend runs)
_BUILTIN_BACKEND_PACKAGES = (
"dlclivegui.cameras.backends", # import every submodule once
)
_BACKENDS_IMPORTED = False
def _ensure_backends_loaded() -> None:
"""Import all built-in backend modules once so their decorators run."""
global _BACKENDS_IMPORTED
if _BACKENDS_IMPORTED:
return
for pkg_name in _BUILTIN_BACKEND_PACKAGES:
try:
pkg = importlib.import_module(pkg_name)
except Exception as exc:
_BACKEND_IMPORT_ERRORS[pkg_name] = f"{type(exc).__name__}: {exc}"
logger.exception("FAILED to import backend package '%s': %s", pkg_name, exc)
if environ.get("DLC_CAMERA_BACKENDS_STRICT_IMPORT", "").strip().lower() in ("1", "true", "yes"):
raise
# Package might not exist (fine if all backends are third-party via tests/plugins)
continue
# Import every submodule of the package (triggers decorator side-effects)
pkg_path = getattr(pkg, "__path__", None)
if not pkg_path:
continue
for _finder, mod_name, _is_pkg in pkgutil.iter_modules(pkg_path, prefix=pkg_name + "."):
try:
importlib.import_module(mod_name)
logger.debug("Loaded camera backend module: %s", mod_name)
except Exception as exc:
# Record and log loudly WITH traceback
_BACKEND_IMPORT_ERRORS[mod_name] = f"{type(exc).__name__}: {exc}"
logger.exception("FAILED to import backend module '%s': %s", mod_name, exc)
# Optional fail-fast mode for CI/dev
if environ.get("DLC_CAMERA_BACKENDS_STRICT_IMPORT", "").strip().lower() in ("1", "true", "yes"):
raise
_BACKENDS_IMPORTED = True
class CameraFactory:
"""Create camera backend instances based on configuration."""
@staticmethod
def backend_names() -> Iterable[str]:
"""Return the identifiers of all known backends."""
_ensure_backends_loaded()
return tuple(_BACKEND_REGISTRY.keys())
@staticmethod
def available_backends() -> dict[str, bool]:
"""Return a mapping of backend names to availability flags."""
_ensure_backends_loaded()
availability: dict[str, bool] = {}
for name in _BACKEND_REGISTRY:
try:
backend_cls = CameraFactory._resolve_backend(name)
except RuntimeError:
availability[name] = False
continue
availability[name] = backend_cls.is_available()
return availability
@staticmethod
def backend_capabilities(backend: str) -> dict[str, SupportLevel]:
"""
Return the backend’s static capabilities (safe to call even if backend unavailable).
"""
_ensure_backends_loaded()
key = (backend or "opencv").lower()
try:
backend_cls = CameraFactory._resolve_backend(key)
except Exception:
return dict(DEFAULT_CAPABILITIES)
try:
return backend_cls.static_capabilities()
except Exception:
return dict(DEFAULT_CAPABILITIES)
@staticmethod
def detect_cameras(
backend: str,
max_devices: int = 10,
*,
should_cancel: Callable[[], bool] | None = None,
progress_cb: Callable[[str], None] | None = None,
) -> list[DetectedCamera]:
"""Probe ``backend`` for available cameras.
Parameters
----------
backend:
The backend identifier, e.g. ``"opencv"``.
max_devices:
Upper bound for the indices that should be probed.
For backends with get_device_count (GenTL, Aravis), the actual device count is queried.
should_cancel:
Optional callable that returns True if discovery should be canceled.
When cancellation is requested, the function returns the cameras found so far.
progress_cb:
Optional callable to receive human-readable progress messages.
Returns
-------
list of :class:`DetectedCamera`
Sorted list of detected cameras with human readable labels (partial if canceled).
"""
_ensure_backends_loaded()
def _canceled() -> bool:
return bool(should_cancel and should_cancel())
try:
backend_cls = CameraFactory._resolve_backend(backend)
except RuntimeError:
return []
if not backend_cls.is_available():
return []
# Resolve device count if possible
num_devices = max_devices
if hasattr(backend_cls, "get_device_count"):
try:
if _canceled():
return []
actual_count = backend_cls.get_device_count()
if actual_count >= 0:
num_devices = actual_count
except Exception:
pass
# ----------------------------------
# "Rich discovery" path
# Try backend-provided rich discovery first, which should retrieve more info.
# This is implemented per-backend and may be faster/more reliable than probing
# each index.
# ----------------------------------
try:
if hasattr(backend_cls, "discover_devices"):
rich = backend_cls.discover_devices(
max_devices=max_devices,
should_cancel=should_cancel,
progress_cb=progress_cb,
)
if rich is not None:
rich.sort(key=lambda c: c.index)
return rich
except Exception:
# NOTE Never fail discovery completely; fallback to probing
logger.exception("Backend %s rich discovery failed; falling back to probing", backend)
# ----------------------------------
# "Probing" path : try to open each index and query info,
# with optional quick presence check
# ----------------------------------
detected: list[DetectedCamera] = []
# Suppress OpenCV warnings/errors during probing (e.g., "can't open camera by index")
with _suppress_opencv_logging():
try:
for index in range(num_devices):
if _canceled():
# return partial results immediately
break
if progress_cb:
progress_cb(f"Probing {backend}:{index}…")
# Prefer quick presence check first
quick_ok = None
if hasattr(backend_cls, "quick_ping"):
try:
quick_ok = bool(backend_cls.quick_ping(index)) # type: ignore[attr-defined]
except TypeError:
quick_ok = bool(backend_cls.quick_ping(index, None)) # type: ignore[attr-defined]
except Exception:
quick_ok = None
if quick_ok is False:
# Definitely not present, skip heavy open
continue
settings = CameraSettings(
name=f"Probe {index}",
index=index,
backend=backend,
properties={},
)
settings = backend_cls.sanitize_for_probe(settings)
backend_instance = backend_cls(settings)
try:
# This open() may block for a short time depending on driver/backend.
backend_instance.open()
except Exception:
# Not available → continue probing next index
pass
else:
label = backend_instance.device_name() or f"{backend.title()} #{index}"
detected.append(DetectedCamera(index=index, label=label))
if progress_cb:
progress_cb(f"Found {label}")
finally:
try:
backend_instance.close()
except Exception:
pass
# Check cancel again between indices
if _canceled():
break
except KeyboardInterrupt:
# Graceful early exit with partial results
if progress_cb:
progress_cb("Discovery interrupted.")
# any other exception bubbles up to caller
detected.sort(key=lambda camera: camera.index)
return detected
@staticmethod
def create(settings: CameraSettings) -> CameraBackend:
"""Instantiate a backend for ``settings``."""
# always ensure backends are loaded before creating,
# to get accurate error reporting for unknown backends
_ensure_backends_loaded()
dc = settings
backend_name = (dc.backend or "opencv").lower()
try:
backend_cls = CameraFactory._resolve_backend(backend_name)
try:
if hasattr(backend_cls, "rebind_settings"):
settings = backend_cls.rebind_settings(settings)
except Exception:
logger.debug("Backend %s rebind_settings failed during creation", backend_name, exc_info=True)
try:
backend_cls.parse_options(settings) # ensures bad config fails loudly here
except Exception as exc:
raise RuntimeError(f"Invalid {backend_name} options: {exc}") from exc
except RuntimeError as exc: # pragma: no cover - runtime configuration
raise RuntimeError(f"Unknown camera backend '{backend_name}': {exc}") from exc
if not backend_cls.is_available():
raise RuntimeError(
f"Camera backend '{backend_name}' is not available. "
"Ensure the required drivers and Python packages are installed."
)
return backend_cls(settings)
@staticmethod
def check_camera_available(settings: CameraSettings) -> tuple[bool, str]:
"""Check if a camera is present/accessible without pushing heavy settings like FPS."""
# always ensure backends are loaded before checking,
# to get accurate error reporting for unknown backends
_ensure_backends_loaded()
dc = settings
backend_name = (dc.backend or "opencv").lower()
try:
backend_cls = CameraFactory._resolve_backend(backend_name)
except RuntimeError as exc:
return False, f"Backend '{backend_name}' not installed: {exc}"
if not backend_cls.is_available():
return False, f"Backend '{backend_name}' is not available (missing drivers/packages)"
# Allow backend to rebind settings for probing
# This should be lightweight and avoid heavy settings like FPS/resolution
try:
if hasattr(backend_cls, "rebind_settings"):
dc = backend_cls.rebind_settings(dc)
except Exception:
logger.debug("Backend %s rebind_settings failed during availability check", backend_name, exc_info=True)
# Prefer quick presence test
if hasattr(backend_cls, "quick_ping"):
try:
with _suppress_opencv_logging():
idx = int(dc.index)
ok = False
try:
ok = backend_cls.quick_ping(idx) # type: ignore[attr-defined]
except TypeError:
ok = backend_cls.quick_ping(idx, None) # type: ignore[attr-defined]
if ok:
return True, ""
return False, "Device not present"
except Exception as exc:
return False, f"Quick probe failed: {exc}"
# Fallback: lightweight open/close with sanitized settings
try:
probe_settings = backend_cls.sanitize_for_probe(dc)
backend_instance = backend_cls(probe_settings)
with _suppress_opencv_logging():
backend_instance.open()
backend_instance.close()
return True, ""
except Exception as exc:
return False, f"Camera not accessible: {exc}"
@staticmethod
def backend_import_errors() -> dict[str, str]:
_ensure_backends_loaded()
return dict(_BACKEND_IMPORT_ERRORS)
@staticmethod
def _resolve_backend(name: str) -> type[CameraBackend]:
_ensure_backends_loaded()
key = name.lower()
try:
return _BACKEND_REGISTRY[key]
except KeyError as exc:
available = ", ".join(sorted(_BACKEND_REGISTRY.keys())) or "(none)"
# Show import failures that might explain missing registration
# (filter to your backend packages to avoid noise)
failing = (
"\n".join(f" - {mod}: {_BACKEND_IMPORT_ERRORS[mod]}" for mod in sorted(_BACKEND_IMPORT_ERRORS.keys()))
or " (no import errors recorded)"
)
msg = (
f"Backend '{key}' not registered. Available: {available}\n"
f"Backend module import errors (most likely cause):\n{failing}\n"
"Tip: enable strict import failures with DLC_CAMERA_BACKENDS_STRICT_IMPORT=1"
)
raise RuntimeError(msg) from exc
# -------------------------------
# Camera identity utilities
# -------------------------------
def apply_detected_identity(cam: CameraSettings, detected: DetectedCamera, backend: str) -> None:
"""Persist stable identity from a detected camera into cam.properties under backend namespace."""
if not isinstance(cam.properties, dict):
cam.properties = {}
ns = cam.properties.get(backend.lower())
if not isinstance(ns, dict):
ns = {}
cam.properties[backend.lower()] = ns
# Store whatever we have (backend-specific but written generically)
if getattr(detected, "device_id", None):
ns["device_id"] = detected.device_id
if getattr(detected, "vid", None) is not None:
ns["device_vid"] = int(detected.vid)
if getattr(detected, "pid", None) is not None:
ns["device_pid"] = int(detected.pid)
if getattr(detected, "path", None):
ns["device_path"] = detected.path
# Optional: store human name for matching fallback
if getattr(detected, "label", None):
ns["device_name"] = detected.label
# Optional: store backend_hint if you expose it (e.g., CAP_DSHOW)
if getattr(detected, "backend_hint", None) is not None:
ns["backend_hint"] = int(detected.backend_hint)
def camera_identity_key(cam: CameraSettings) -> tuple:
backend = (cam.backend or "").lower()
props = cam.properties if isinstance(cam.properties, dict) else {}
ns = props.get(backend, {}) if isinstance(props, dict) else {}
device_id = ns.get("device_id")
# Prefer stable identity if present, otherwise fallback
if device_id:
return (backend, "device_id", device_id)
return (backend, "index", int(cam.index))