|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# This file is part of Checkbox. |
| 3 | +# |
| 4 | +# Checkbox is free software: you can redistribute it and/or modify |
| 5 | +# it under the terms of the GNU General Public License version 3, |
| 6 | +# as published by the Free Software Foundation. |
| 7 | +# |
| 8 | +# Checkbox is distributed in the hope that it will be useful, |
| 9 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 10 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
| 11 | +# GNU General Public License for more details. |
| 12 | +# |
| 13 | +# You should have received a copy of the GNU General Public License |
| 14 | +# along with Checkbox. If not, see <http://www.gnu.org/licenses/>. |
| 15 | +import logging |
| 16 | +import os |
| 17 | + |
| 18 | +from enum import Enum |
| 19 | +from typing import Dict, List, Type |
| 20 | +from camera_utils import ( |
| 21 | + CameraInterface, |
| 22 | + execute_command, |
| 23 | + SupportedMethods, |
| 24 | + GST_LAUNCH_BIN, |
| 25 | + CameraError, |
| 26 | + CameraConfigurationError, |
| 27 | + VideoMediaNodeResolver, |
| 28 | + log_and_raise_error, |
| 29 | +) |
| 30 | + |
| 31 | +logger = logging.getLogger(__name__) |
| 32 | + |
| 33 | + |
| 34 | +class SupportedCamera(Enum): |
| 35 | + """ |
| 36 | + Supported camera modules on Genio platforms. |
| 37 | +
|
| 38 | + Each enum value corresponds to a concrete camera implementation class. |
| 39 | + The string value matches the camera module identifier used in the system. |
| 40 | + """ |
| 41 | + |
| 42 | + OV_5645 = "ov_5645" # OV_5645 sensor |
| 43 | + |
| 44 | + def __str__(self): |
| 45 | + return self.value |
| 46 | + |
| 47 | + |
| 48 | +def rz_camera_factory( |
| 49 | + platform: str, camera_module: str |
| 50 | +) -> Type[CameraInterface]: |
| 51 | + """ |
| 52 | + Factory function to create camera handler instances. |
| 53 | +
|
| 54 | + Args: |
| 55 | + camera_module: String identifier of the camera module |
| 56 | +
|
| 57 | + Returns: |
| 58 | + Camera handler class that implements CameraInterface |
| 59 | +
|
| 60 | + Raises: |
| 61 | + ValueError: If camera_module is not supported |
| 62 | + """ |
| 63 | + if "rz" not in platform: |
| 64 | + raise CameraError( |
| 65 | + "Unsupported platform: {}. Supported platform is: RZ".format( |
| 66 | + platform |
| 67 | + ) |
| 68 | + ) |
| 69 | + |
| 70 | + camera_handlers = { |
| 71 | + str(SupportedCamera.OV_5645): Ov5645Camera, |
| 72 | + } |
| 73 | + |
| 74 | + handler_class = camera_handlers.get(camera_module) |
| 75 | + if not handler_class: |
| 76 | + raise CameraError( |
| 77 | + "Unsupported camera module: {}. " |
| 78 | + "Supported modules are: {}".format( |
| 79 | + camera_module, list(camera_handlers.keys()) |
| 80 | + ) |
| 81 | + ) |
| 82 | + |
| 83 | + return handler_class |
| 84 | + |
| 85 | + |
| 86 | +class RzVideoNodeResolver(VideoMediaNodeResolver): |
| 87 | + """ |
| 88 | + RZ-specific video node resolver that extends the general |
| 89 | + VideoMediaNodeResolver. |
| 90 | +
|
| 91 | + This class provides RZ-specific logic for resolving video device nodes |
| 92 | + based on camera type and architecture while leveraging the general parsing |
| 93 | + functionality from VideoMediaNodeResolver. |
| 94 | + """ |
| 95 | + |
| 96 | + def get_camera_video_nodes( |
| 97 | + self, |
| 98 | + camera: SupportedCamera, |
| 99 | + v4l2_device_name: str, |
| 100 | + ) -> Dict[str, str]: |
| 101 | + """ |
| 102 | + Get video device nodes classified by camera type. |
| 103 | +
|
| 104 | + Args: |
| 105 | + camera: Camera type (SupportedCamera enum) |
| 106 | + v4l2_device_name: Name of the V4L2 device |
| 107 | +
|
| 108 | + Returns: |
| 109 | + Dictionary mapping node types to device paths |
| 110 | +
|
| 111 | + Raises: |
| 112 | + CameraError: For unsupported camera types or architectures |
| 113 | + CameraOperationError: For operational errors |
| 114 | + """ |
| 115 | + return self._resolve_common_camera(v4l2_device_name, camera.value) |
| 116 | + |
| 117 | + def _resolve_common_camera( |
| 118 | + self, |
| 119 | + v4l2_device_name: str, |
| 120 | + camera_value: str, |
| 121 | + ) -> Dict[str, str]: |
| 122 | + """Resolve video nodes for common cameras.""" |
| 123 | + # Common camera: simple video node lookup |
| 124 | + video_nodes = self.get_video_nodes(v4l2_device_name) |
| 125 | + self._validate_video_nodes( |
| 126 | + video_nodes, 1, camera_value, v4l2_device_name |
| 127 | + ) |
| 128 | + return {"all": video_nodes[0]} |
| 129 | + |
| 130 | + def _validate_video_nodes( |
| 131 | + self, |
| 132 | + dev_video_nodes: List[str], |
| 133 | + expected_count: int, |
| 134 | + camera_value: str, |
| 135 | + v4l2_device_name: str, |
| 136 | + ) -> None: |
| 137 | + """Validate video nodes count and log results.""" |
| 138 | + if not dev_video_nodes: |
| 139 | + log_and_raise_error( |
| 140 | + "No video device nodes found for '{}'".format( |
| 141 | + v4l2_device_name |
| 142 | + ), |
| 143 | + CameraConfigurationError, |
| 144 | + ) |
| 145 | + |
| 146 | + logger.info( |
| 147 | + "Found {} video device nodes for '{}' - '{}': {}".format( |
| 148 | + len(dev_video_nodes), |
| 149 | + v4l2_device_name, |
| 150 | + camera_value, |
| 151 | + dev_video_nodes, |
| 152 | + ) |
| 153 | + ) |
| 154 | + |
| 155 | + if len(dev_video_nodes) != expected_count: |
| 156 | + log_and_raise_error( |
| 157 | + "Expected {} video node(s) for {}, found {}".format( |
| 158 | + expected_count, camera_value, len(dev_video_nodes) |
| 159 | + ), |
| 160 | + CameraConfigurationError, |
| 161 | + ) |
| 162 | + |
| 163 | + |
| 164 | +class RzBaseCamera(CameraInterface): |
| 165 | + """ |
| 166 | + Base class for Genio camera implementations. |
| 167 | + """ |
| 168 | + |
| 169 | + def __init__(self, v4l2_devices: str): |
| 170 | + super().__init__(v4l2_devices) # Call ABC's __init__ |
| 171 | + self._v4l2_devices = v4l2_devices |
| 172 | + |
| 173 | + def _get_artifact_path( |
| 174 | + self, store_path: str, artifact_name: str, format: str |
| 175 | + ) -> str: |
| 176 | + """Get the appropriate file extension based on format.""" |
| 177 | + suffix = ".jpg" if format == "JPEG" else ".yuv" |
| 178 | + return os.path.join(store_path, artifact_name + suffix) |
| 179 | + |
| 180 | + def _build_gstreamer_cmd( |
| 181 | + self, |
| 182 | + dev_video_node: str, |
| 183 | + width: int, |
| 184 | + height: int, |
| 185 | + format: str, |
| 186 | + full_artifact_path: str, |
| 187 | + count: "int | None" = None, |
| 188 | + framerate: "int | None" = None, |
| 189 | + ) -> str: |
| 190 | + """Build the GStreamer command.""" |
| 191 | + assert GST_LAUNCH_BIN |
| 192 | + assert dev_video_node |
| 193 | + |
| 194 | + v4l2src_words = [ |
| 195 | + "v4l2src", |
| 196 | + "device={}".format(dev_video_node), |
| 197 | + "io-mode=dmabuf", |
| 198 | + "num-buffers={}".format(count or 30), |
| 199 | + ] |
| 200 | + caps_filter_words = [ |
| 201 | + "image/jpeg" if format == "JPEG" else "video/x-raw", |
| 202 | + "width={}".format(width), |
| 203 | + "height={}".format(height), |
| 204 | + "format={}".format(format), |
| 205 | + ] |
| 206 | + |
| 207 | + if framerate is not None: |
| 208 | + caps_filter_words.append("framerate={}/1".format(framerate)) |
| 209 | + |
| 210 | + if count is not None: |
| 211 | + sink = "filesink location={}".format(full_artifact_path) |
| 212 | + else: |
| 213 | + sink = "multifilesink location={} max-files=1".format( |
| 214 | + full_artifact_path |
| 215 | + ) |
| 216 | + |
| 217 | + return "{} -v {} ! {} ! {}".format( |
| 218 | + GST_LAUNCH_BIN, |
| 219 | + " ".join(v4l2src_words), |
| 220 | + ",".join(caps_filter_words), |
| 221 | + sink, |
| 222 | + ) |
| 223 | + |
| 224 | + def _build_v4l2_cmd( |
| 225 | + self, |
| 226 | + dev_video_node: str, |
| 227 | + width: int, |
| 228 | + height: int, |
| 229 | + format: str, |
| 230 | + full_artifact_path: str, |
| 231 | + count: "int | None" = None, |
| 232 | + ) -> str: |
| 233 | + """Build the v4l2 command.""" |
| 234 | + raise NotImplementedError( |
| 235 | + "v4l2-ctl is currently not implemented for RZ platforms." |
| 236 | + ) |
| 237 | + |
| 238 | + def _get_camera_dev_video_node(self, v4l2_device_name: str) -> dict: |
| 239 | + """Get the video device node for the given v4l2 device name.""" |
| 240 | + resolver = RzVideoNodeResolver(self._v4l2_devices) |
| 241 | + return resolver.get_camera_video_nodes(self._camera, v4l2_device_name) |
| 242 | + |
| 243 | + def capture_image( |
| 244 | + self, |
| 245 | + width: int, |
| 246 | + height: int, |
| 247 | + format: str, |
| 248 | + store_path: str, |
| 249 | + artifact_name: str, |
| 250 | + method: str, |
| 251 | + v4l2_device_name: str, |
| 252 | + ) -> None: |
| 253 | + """Capture an image using the specified method.""" |
| 254 | + full_artifact_path = self._get_artifact_path( |
| 255 | + store_path, artifact_name, format |
| 256 | + ) |
| 257 | + logging.info("Capture image as {}".format(full_artifact_path)) |
| 258 | + |
| 259 | + dev_video_nodes = self._get_camera_dev_video_node(v4l2_device_name) |
| 260 | + dev_video_node = dev_video_nodes.get("capture") or dev_video_nodes.get( |
| 261 | + "all" |
| 262 | + ) |
| 263 | + if not dev_video_node: |
| 264 | + log_and_raise_error( |
| 265 | + "No video device node found for {}".format(v4l2_device_name), |
| 266 | + CameraConfigurationError, |
| 267 | + ) |
| 268 | + |
| 269 | + logger.info("Capture image with {}".format(method)) |
| 270 | + |
| 271 | + if method == SupportedMethods.GSTREAMER: |
| 272 | + cmd = self._build_gstreamer_cmd( |
| 273 | + dev_video_node, width, height, format, full_artifact_path |
| 274 | + ) |
| 275 | + elif method == SupportedMethods.V4L2_CTL: |
| 276 | + cmd = self._build_v4l2_cmd( |
| 277 | + dev_video_node, width, height, format, full_artifact_path |
| 278 | + ) |
| 279 | + else: |
| 280 | + msg = "No suitable method such as '{}' or '{}' be provided".format( |
| 281 | + SupportedMethods.GSTREAMER, SupportedMethods.V4L2_CTL |
| 282 | + ) |
| 283 | + log_and_raise_error(msg, CameraConfigurationError) |
| 284 | + |
| 285 | + logger.info("Executing command:\n{}".format(cmd)) |
| 286 | + output = execute_command(cmd=cmd) |
| 287 | + logger.info("Output:\n{}".format(output)) |
| 288 | + |
| 289 | + def record_video( |
| 290 | + self, |
| 291 | + width: int, |
| 292 | + height: int, |
| 293 | + framerate: int, |
| 294 | + format: str, |
| 295 | + count: int, |
| 296 | + store_path: str, |
| 297 | + artifact_name: str, |
| 298 | + method: str, |
| 299 | + v4l2_device_name: str, |
| 300 | + ) -> None: |
| 301 | + """Record a video using the specified method.""" |
| 302 | + full_artifact_path = self._get_artifact_path( |
| 303 | + store_path, artifact_name, "YUV" |
| 304 | + ) |
| 305 | + logging.info("Record a video as {}".format(full_artifact_path)) |
| 306 | + |
| 307 | + dev_video_nodes = self._get_camera_dev_video_node(v4l2_device_name) |
| 308 | + dev_video_node = dev_video_nodes.get("record") or dev_video_nodes.get( |
| 309 | + "all" |
| 310 | + ) |
| 311 | + if not dev_video_node: |
| 312 | + log_and_raise_error( |
| 313 | + "No video device node found for {}".format(v4l2_device_name), |
| 314 | + CameraConfigurationError, |
| 315 | + ) |
| 316 | + |
| 317 | + logger.info("Record video with {}".format(method)) |
| 318 | + |
| 319 | + if method == SupportedMethods.GSTREAMER: |
| 320 | + cmd = self._build_gstreamer_cmd( |
| 321 | + dev_video_node, |
| 322 | + width, |
| 323 | + height, |
| 324 | + format, |
| 325 | + full_artifact_path, |
| 326 | + count=count, |
| 327 | + framerate=framerate, |
| 328 | + ) |
| 329 | + elif method == SupportedMethods.V4L2_CTL: |
| 330 | + cmd = self._build_v4l2_cmd( |
| 331 | + dev_video_node, width, height, format, full_artifact_path |
| 332 | + ) |
| 333 | + else: |
| 334 | + msg = "No suitable method such as '{}' or '{}' be provided".format( |
| 335 | + SupportedMethods.GSTREAMER, SupportedMethods.V4L2_CTL |
| 336 | + ) |
| 337 | + log_and_raise_error(msg, CameraConfigurationError) |
| 338 | + |
| 339 | + logger.info("Executing command:\n {}".format(cmd)) |
| 340 | + output = execute_command(cmd=cmd) |
| 341 | + logger.info("Output:\n{}".format(output)) |
| 342 | + |
| 343 | + |
| 344 | +class Ov5645Camera(RzBaseCamera): |
| 345 | + """Handler for OV 5645 camera.""" |
| 346 | + |
| 347 | + def __init__(self, v4l2_devices: str): |
| 348 | + super().__init__(v4l2_devices) |
| 349 | + self._camera = SupportedCamera.OV_5645 |
0 commit comments