Skip to content
Merged

Test #906

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .github/workflows/camera-test-rpicam-apps.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
name: Camera Tests

on:
workflow_dispatch:

pull_request_target:
branches: [main]

Expand All @@ -12,8 +10,6 @@ permissions:
jobs:
trigger-camera-tests:
runs-on: [self-hosted, camera-test-bridge]
if: >-
github.event.pull_request.head.repo.full_name == github.repository
timeout-minutes: 120
steps:
- name: Checkout camera_tester
Expand Down
189 changes: 189 additions & 0 deletions utils/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,49 @@
import numpy as np


def detect_imx500():
"""Check if an IMX500 sensor is connected by scanning v4l-subdev sysfs entries."""
for i in range(16):
module_link = f'/sys/class/video4linux/v4l-subdev{i}/device/driver/module'
try:
target = os.readlink(module_link)
if 'imx500' in target:
return True
except OSError:
continue
return False


def detect_hailo():
"""Detect Hailo accelerator and return architecture string, or None if not found."""
try:
result = subprocess.run(['hailortcli', 'fw-control', 'identify'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
for line in result.stdout.splitlines():
if 'Device Architecture' in line:
arch = line.split(':')[-1].strip()
if arch in ('HAILO8', 'HAILO8L', 'HAILO10H'):
return arch
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return None


def detect_num_cameras(exe_dir):
"""Count the number of cameras detected by rpicam-hello --list-cameras."""
import re
try:
executable = os.path.join(exe_dir, 'rpicam-hello')
result = subprocess.run([executable, '--list-cameras'],
capture_output=True, text=True, timeout=10)
if result.returncode == 0:
return len(re.findall(r'^\d+ : ', result.stdout, re.MULTILINE))
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
return 0


def get_platform():
platform = 'vc4'
try:
Expand Down Expand Up @@ -148,6 +191,38 @@ def test_hello(exe_dir, output_dir, preview_dir):
check_retcode(retcode, "test_hello: no-raw test")
check_time(time_taken, 1, 6, "test_hello: no-raw test")

# Multi-camera tests. Require at least 2 cameras.
num_cameras = detect_num_cameras(exe_dir)
if num_cameras < 2:
print(" Skipping multi-camera tests - less than 2 cameras detected")
else:
# "camera selection test". Run with --camera 1 to verify non-default camera works.
print(" camera selection test")
retcode, time_taken = run_executable(
args + ['-t', '2000', '--camera', '1'], logfile)
check_retcode(retcode, "test_hello: camera selection test")
check_time(time_taken, 1, 6, "test_hello: camera selection test")

# "dual camera test". Run two cameras simultaneously.
print(" dual camera test")
logfile0 = os.path.join(output_dir, 'log0.txt')
logfile1 = os.path.join(output_dir, 'log1.txt')
args0 = [executable, '-t', '10000', '--camera', '0']
args1 = [executable, '-t', '10000', '--camera', '1']
if preview_dir:
args0 += ['--preview-libs', preview_dir]
args1 += ['--preview-libs', preview_dir]
start_time = timer()
with open(logfile0, 'w') as lf0, open(logfile1, 'w') as lf1:
p0 = subprocess.Popen(args0, stdout=lf0, stderr=subprocess.STDOUT)
p1 = subprocess.Popen(args1, stdout=lf1, stderr=subprocess.STDOUT)
p0.communicate()
p1.communicate()
time_taken = timer() - start_time
check_retcode(p0.returncode, "test_hello: dual camera test (camera 0)")
check_retcode(p1.returncode, "test_hello: dual camera test (camera 1)")
check_time(time_taken, 8, 20, "test_hello: dual camera test")

print("rpicam-hello tests passed")


Expand Down Expand Up @@ -636,9 +711,123 @@ def test_post_processing(exe_dir, output_dir, json_dir, postproc_dir):
if log_text.find('Inference time') < 0: # relies on "verbose" being set in the JSON
raise TestFailure("test_post_processing: detect test - TFLite model did not run")

# IMX500 tests - only if hardware detected.
if detect_imx500():
test_imx500(exe_dir, output_dir, json_dir, postproc_dir)
else:
print("Skipping IMX500 tests - no IMX500 sensor detected")

# Hailo tests - only if hardware detected.
if detect_hailo():
test_hailo(exe_dir, output_dir, json_dir, postproc_dir)
else:
print("Skipping Hailo tests - no Hailo device detected")

print("post-processing tests passed")


def test_imx500(exe_dir, output_dir, json_dir, postproc_dir):
logfile = os.path.join(output_dir, 'log.txt')
print("Testing IMX500 post-processing")
clean_dir(output_dir)

# Check that the IMX500 post-processing library is available.
if postproc_dir:
imx500_so = os.path.join(postproc_dir, 'imx500-postproc.so')
if not os.path.isfile(imx500_so):
print("WARNING: imx500-postproc.so not found in", postproc_dir, "- skipping IMX500 tests")
return

executable = os.path.join(exe_dir, 'rpicam-hello')
check_exists(executable, 'test_imx500')

# "object detection test". Run IMX500 MobileNet SSD object detection.
print(" object detection test")
json_file = os.path.join(json_dir, 'imx500_mobilenet_ssd.json')
check_exists(json_file, 'test_imx500')
try:
json_object = json.load(open(json_file, 'r'))
stage_name = list(json_object.keys())[0]
network_file = json_object[stage_name]['network_file']
check_exists(network_file, 'test_imx500')
except Exception:
print('WARNING: test_imx500: object detection test - model unavailable, skipping test')
else:
args = [executable, '-t', '5000', '--post-process-file', json_file]
if postproc_dir:
args += ['--post-process-libs', postproc_dir]
retcode, time_taken = run_executable(args, logfile)
check_retcode(retcode, "test_imx500: object detection test")
check_time(time_taken, 3, 360, "test_imx500: object detection test")
log_text = open(logfile, 'r').read()
if 'No post processing stage found' in log_text:
print("WARNING: test_imx500: object detection test - missing stages, test incomplete")

# "posenet test". Run IMX500 posenet.
print(" posenet test")
json_file = os.path.join(json_dir, 'imx500_posenet.json')
check_exists(json_file, 'test_imx500')
try:
json_object = json.load(open(json_file, 'r'))
stage_name = list(json_object.keys())[0]
network_file = json_object[stage_name]['network_file']
check_exists(network_file, 'test_imx500')
except Exception:
print('WARNING: test_imx500: posenet test - model unavailable, skipping test')
else:
args = [executable, '-t', '5000', '--post-process-file', json_file]
if postproc_dir:
args += ['--post-process-libs', postproc_dir]
retcode, time_taken = run_executable(args, logfile)
check_retcode(retcode, "test_imx500: posenet test")
check_time(time_taken, 3, 360, "test_imx500: posenet test")
log_text = open(logfile, 'r').read()
if 'No post processing stage found' in log_text:
print("WARNING: test_imx500: posenet test - missing stages, test incomplete")

print("IMX500 tests passed")


def test_hailo(exe_dir, output_dir, json_dir, postproc_dir):
logfile = os.path.join(output_dir, 'log.txt')
print("Testing Hailo post-processing")
clean_dir(output_dir)

# Check that the Hailo post-processing library is available.
if postproc_dir:
hailo_so = os.path.join(postproc_dir, 'hailo-postproc.so')
if not os.path.isfile(hailo_so):
print("WARNING: hailo-postproc.so not found in", postproc_dir, "- skipping Hailo tests")
return

executable = os.path.join(exe_dir, 'rpicam-hello')
check_exists(executable, 'test_hailo')

tests = [
("yolo inference test", 'hailo_yolov8_inference.json'),
("classifier test", 'hailo_classifier.json'),
("pose estimation test", 'hailo_yolov8_pose.json'),
]

for test_name, json_name in tests:
print(" ", test_name)
json_file = os.path.join(json_dir, json_name)
check_exists(json_file, 'test_hailo')

args = [executable, '-t', '5000', '--post-process-file', json_file,
'--lores-width', '640', '--lores-height', '640']
if postproc_dir:
args += ['--post-process-libs', postproc_dir]
retcode, time_taken = run_executable(args, logfile)
check_retcode(retcode, "test_hailo: " + test_name)
check_time(time_taken, 3, 20, "test_hailo: " + test_name)
log_text = open(logfile, 'r').read()
if 'No post processing stage found' in log_text:
print("WARNING: test_hailo:", test_name, "- missing stages, test incomplete")

print("Hailo tests passed")


def test_all(apps, exe_dir, output_dir, json_dir, postproc_dir, preview_dir, encoder_dir):
try:
if 'hello' in apps:
Expand Down
Loading