diff --git a/.github/workflows/camera-test-rpicam-apps.yml b/.github/workflows/camera-test-rpicam-apps.yml index 0c129cfb..d756365a 100644 --- a/.github/workflows/camera-test-rpicam-apps.yml +++ b/.github/workflows/camera-test-rpicam-apps.yml @@ -1,8 +1,6 @@ name: Camera Tests on: - workflow_dispatch: - pull_request_target: branches: [main] @@ -12,8 +10,6 @@ permissions: jobs: trigger-camera-tests: runs-on: [self-hosted, camera-test-bridge] - if: >- - github.event.pull_request.head.repo.full_name == github.repository timeout-minutes: 120 steps: - name: Checkout camera_tester diff --git a/utils/test.py b/utils/test.py index e3aee49d..ab6fe57e 100755 --- a/utils/test.py +++ b/utils/test.py @@ -24,6 +24,49 @@ import numpy as np +def detect_imx500(): + """Check if an IMX500 sensor is connected by scanning v4l-subdev sysfs entries.""" + for i in range(16): + module_link = f'/sys/class/video4linux/v4l-subdev{i}/device/driver/module' + try: + target = os.readlink(module_link) + if 'imx500' in target: + return True + except OSError: + continue + return False + + +def detect_hailo(): + """Detect Hailo accelerator and return architecture string, or None if not found.""" + try: + result = subprocess.run(['hailortcli', 'fw-control', 'identify'], + capture_output=True, text=True, timeout=10) + if result.returncode == 0: + for line in result.stdout.splitlines(): + if 'Device Architecture' in line: + arch = line.split(':')[-1].strip() + if arch in ('HAILO8', 'HAILO8L', 'HAILO10H'): + return arch + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + return None + + +def detect_num_cameras(exe_dir): + """Count the number of cameras detected by rpicam-hello --list-cameras.""" + import re + try: + executable = os.path.join(exe_dir, 'rpicam-hello') + result = subprocess.run([executable, '--list-cameras'], + capture_output=True, text=True, timeout=10) + if result.returncode == 0: + return len(re.findall(r'^\d+ : ', result.stdout, re.MULTILINE)) + except (FileNotFoundError, subprocess.TimeoutExpired): + pass + return 0 + + def get_platform(): platform = 'vc4' try: @@ -148,6 +191,38 @@ def test_hello(exe_dir, output_dir, preview_dir): check_retcode(retcode, "test_hello: no-raw test") check_time(time_taken, 1, 6, "test_hello: no-raw test") + # Multi-camera tests. Require at least 2 cameras. + num_cameras = detect_num_cameras(exe_dir) + if num_cameras < 2: + print(" Skipping multi-camera tests - less than 2 cameras detected") + else: + # "camera selection test". Run with --camera 1 to verify non-default camera works. + print(" camera selection test") + retcode, time_taken = run_executable( + args + ['-t', '2000', '--camera', '1'], logfile) + check_retcode(retcode, "test_hello: camera selection test") + check_time(time_taken, 1, 6, "test_hello: camera selection test") + + # "dual camera test". Run two cameras simultaneously. + print(" dual camera test") + logfile0 = os.path.join(output_dir, 'log0.txt') + logfile1 = os.path.join(output_dir, 'log1.txt') + args0 = [executable, '-t', '10000', '--camera', '0'] + args1 = [executable, '-t', '10000', '--camera', '1'] + if preview_dir: + args0 += ['--preview-libs', preview_dir] + args1 += ['--preview-libs', preview_dir] + start_time = timer() + with open(logfile0, 'w') as lf0, open(logfile1, 'w') as lf1: + p0 = subprocess.Popen(args0, stdout=lf0, stderr=subprocess.STDOUT) + p1 = subprocess.Popen(args1, stdout=lf1, stderr=subprocess.STDOUT) + p0.communicate() + p1.communicate() + time_taken = timer() - start_time + check_retcode(p0.returncode, "test_hello: dual camera test (camera 0)") + check_retcode(p1.returncode, "test_hello: dual camera test (camera 1)") + check_time(time_taken, 8, 20, "test_hello: dual camera test") + print("rpicam-hello tests passed") @@ -636,9 +711,123 @@ def test_post_processing(exe_dir, output_dir, json_dir, postproc_dir): if log_text.find('Inference time') < 0: # relies on "verbose" being set in the JSON raise TestFailure("test_post_processing: detect test - TFLite model did not run") + # IMX500 tests - only if hardware detected. + if detect_imx500(): + test_imx500(exe_dir, output_dir, json_dir, postproc_dir) + else: + print("Skipping IMX500 tests - no IMX500 sensor detected") + + # Hailo tests - only if hardware detected. + if detect_hailo(): + test_hailo(exe_dir, output_dir, json_dir, postproc_dir) + else: + print("Skipping Hailo tests - no Hailo device detected") + print("post-processing tests passed") +def test_imx500(exe_dir, output_dir, json_dir, postproc_dir): + logfile = os.path.join(output_dir, 'log.txt') + print("Testing IMX500 post-processing") + clean_dir(output_dir) + + # Check that the IMX500 post-processing library is available. + if postproc_dir: + imx500_so = os.path.join(postproc_dir, 'imx500-postproc.so') + if not os.path.isfile(imx500_so): + print("WARNING: imx500-postproc.so not found in", postproc_dir, "- skipping IMX500 tests") + return + + executable = os.path.join(exe_dir, 'rpicam-hello') + check_exists(executable, 'test_imx500') + + # "object detection test". Run IMX500 MobileNet SSD object detection. + print(" object detection test") + json_file = os.path.join(json_dir, 'imx500_mobilenet_ssd.json') + check_exists(json_file, 'test_imx500') + try: + json_object = json.load(open(json_file, 'r')) + stage_name = list(json_object.keys())[0] + network_file = json_object[stage_name]['network_file'] + check_exists(network_file, 'test_imx500') + except Exception: + print('WARNING: test_imx500: object detection test - model unavailable, skipping test') + else: + args = [executable, '-t', '5000', '--post-process-file', json_file] + if postproc_dir: + args += ['--post-process-libs', postproc_dir] + retcode, time_taken = run_executable(args, logfile) + check_retcode(retcode, "test_imx500: object detection test") + check_time(time_taken, 3, 360, "test_imx500: object detection test") + log_text = open(logfile, 'r').read() + if 'No post processing stage found' in log_text: + print("WARNING: test_imx500: object detection test - missing stages, test incomplete") + + # "posenet test". Run IMX500 posenet. + print(" posenet test") + json_file = os.path.join(json_dir, 'imx500_posenet.json') + check_exists(json_file, 'test_imx500') + try: + json_object = json.load(open(json_file, 'r')) + stage_name = list(json_object.keys())[0] + network_file = json_object[stage_name]['network_file'] + check_exists(network_file, 'test_imx500') + except Exception: + print('WARNING: test_imx500: posenet test - model unavailable, skipping test') + else: + args = [executable, '-t', '5000', '--post-process-file', json_file] + if postproc_dir: + args += ['--post-process-libs', postproc_dir] + retcode, time_taken = run_executable(args, logfile) + check_retcode(retcode, "test_imx500: posenet test") + check_time(time_taken, 3, 360, "test_imx500: posenet test") + log_text = open(logfile, 'r').read() + if 'No post processing stage found' in log_text: + print("WARNING: test_imx500: posenet test - missing stages, test incomplete") + + print("IMX500 tests passed") + + +def test_hailo(exe_dir, output_dir, json_dir, postproc_dir): + logfile = os.path.join(output_dir, 'log.txt') + print("Testing Hailo post-processing") + clean_dir(output_dir) + + # Check that the Hailo post-processing library is available. + if postproc_dir: + hailo_so = os.path.join(postproc_dir, 'hailo-postproc.so') + if not os.path.isfile(hailo_so): + print("WARNING: hailo-postproc.so not found in", postproc_dir, "- skipping Hailo tests") + return + + executable = os.path.join(exe_dir, 'rpicam-hello') + check_exists(executable, 'test_hailo') + + tests = [ + ("yolo inference test", 'hailo_yolov8_inference.json'), + ("classifier test", 'hailo_classifier.json'), + ("pose estimation test", 'hailo_yolov8_pose.json'), + ] + + for test_name, json_name in tests: + print(" ", test_name) + json_file = os.path.join(json_dir, json_name) + check_exists(json_file, 'test_hailo') + + args = [executable, '-t', '5000', '--post-process-file', json_file, + '--lores-width', '640', '--lores-height', '640'] + if postproc_dir: + args += ['--post-process-libs', postproc_dir] + retcode, time_taken = run_executable(args, logfile) + check_retcode(retcode, "test_hailo: " + test_name) + check_time(time_taken, 3, 20, "test_hailo: " + test_name) + log_text = open(logfile, 'r').read() + if 'No post processing stage found' in log_text: + print("WARNING: test_hailo:", test_name, "- missing stages, test incomplete") + + print("Hailo tests passed") + + def test_all(apps, exe_dir, output_dir, json_dir, postproc_dir, preview_dir, encoder_dir): try: if 'hello' in apps: