Skip to content

Commit f75a897

Browse files
committed
tests: on_target: introduce prepare_dut fixture and UART live output
Add a shared `prepare_dut` fixture to `conftest.py` that consolidates the repeated flash/factory-reset/boot sequence used across all tests. All test files are updated to use it instead of duplicating the steps. Add structured UART live output control with a pretest suppression phase: output is suppressed during setup and automatically re-enabled when the boot banner is detected or a timeout is hit. Add `wait_for_app_boot` as a named helper for the boot banner check. Add lifecycle hooks that print structured markers around each test phase, and enable `--capture=tee-sys` in CI to capture and show stdout in test reports. Signed-off-by: Simen S. Røstad <simen.rostad@nordicsemi.no>
1 parent 9c97c89 commit f75a897

13 files changed

Lines changed: 197 additions & 95 deletions

File tree

.github/workflows/target-test.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ jobs:
159159
fi
160160
161161
pytest -v "${pytest_marker[@]}" \
162+
--capture=tee-sys \
162163
--junit-xml=results/test-results.xml \
163164
--html=results/test-results.html --self-contained-html \
164165
${PYTEST_PATH}

tests/on_target/tests/conftest.py

Lines changed: 78 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,15 @@
77
import re
88
import pytest
99
import types
10-
from utils.flash_tools import recover_device
10+
from utils.flash_tools import recover_device, flash_device, reset_device
1111
from utils.uart import Uart, UartBinary
1212
import sys
1313
sys.path.append(os.getcwd())
1414
from utils.logger import get_logger
1515
from utils.nrfcloud import NRFCloud, NRFCloudFOTA
1616

1717
logger = get_logger()
18+
TEST_OUTCOMES = {}
1819

1920
UART_TIMEOUT = 60 * 30
2021

@@ -30,10 +31,8 @@ def get_uarts():
3031

3132
if platform.system() == "Darwin": # macOS
3233
base_path = "/dev"
33-
pattern = "tty.*"
3434
else: # Linux
3535
base_path = "/dev/serial/by-id"
36-
pattern = None
3736

3837
try:
3938
if platform.system() == "Darwin":
@@ -57,12 +56,87 @@ def scan_log_for_assertions(log):
5756

5857
@pytest.hookimpl(tryfirst=True)
5958
def pytest_runtest_logstart(nodeid, location):
59+
print(f"\n===== TEST_ITEM_START {nodeid} =====", flush=True)
6060
logger.info(f"Starting test: {nodeid}")
6161

6262
@pytest.hookimpl(trylast=True)
6363
def pytest_runtest_logfinish(nodeid, location):
64+
print(f"===== TEST_ITEM_FINISH {nodeid} =====\n", flush=True)
6465
logger.info(f"Finished test: {nodeid}")
6566

67+
@pytest.hookimpl(tryfirst=True)
68+
def pytest_runtest_setup(item):
69+
print(f"----- TEST_SETUP_START {item.nodeid} -----", flush=True)
70+
71+
@pytest.hookimpl(tryfirst=True)
72+
def pytest_runtest_call(item):
73+
print(f"----- TEST_CALL_START {item.nodeid} -----", flush=True)
74+
# Tests using prepare_dut enable UART output after setup completes.
75+
if "prepare_dut" not in item.fixturenames:
76+
uart = _get_uart_for_item(item)
77+
if uart:
78+
uart.begin_test_run(item.nodeid)
79+
80+
@pytest.hookimpl(tryfirst=True)
81+
def pytest_runtest_teardown(item, nextitem):
82+
status = TEST_OUTCOMES.get(item.nodeid, "unknown")
83+
uart = _get_uart_for_item(item)
84+
if uart:
85+
if uart.is_pretest_phase_active():
86+
uart.end_pretest_phase(reason="teardown_fallback")
87+
uart.mark(f"UART_TEST_END {item.nodeid} status={status}")
88+
uart.set_live_output(False)
89+
print(f"----- TEST_TEARDOWN_END {item.nodeid} status={status} -----", flush=True)
90+
91+
@pytest.hookimpl(hookwrapper=True)
92+
def pytest_runtest_makereport(item, call):
93+
outcome = yield
94+
report = outcome.get_result()
95+
if report.when == "setup" and report.failed:
96+
TEST_OUTCOMES[item.nodeid] = "failed_setup"
97+
elif report.when == "call":
98+
TEST_OUTCOMES[item.nodeid] = report.outcome
99+
print(f"----- TEST_CALL_END {item.nodeid} outcome={report.outcome} -----", flush=True)
100+
elif report.when == "teardown" and item.nodeid not in TEST_OUTCOMES:
101+
TEST_OUTCOMES[item.nodeid] = report.outcome
102+
103+
def _get_uart_for_item(item):
104+
funcargs = getattr(item, "funcargs", {})
105+
for fixture_name in ("dut_cloud", "dut_fota", "dut_board", "dut_traces"):
106+
fixture = funcargs.get(fixture_name)
107+
if fixture and hasattr(fixture, "uart"):
108+
return fixture.uart
109+
return None
110+
111+
@pytest.fixture(scope="function")
112+
def prepare_dut(request):
113+
nodeid = request.node.nodeid
114+
115+
def _prepare_dut(dut, hex_file_path, boot_timeout=240, max_boot_retries=3):
116+
uart = dut.uart
117+
uart.begin_pretest_phase(nodeid)
118+
try:
119+
flash_device(os.path.abspath(hex_file_path))
120+
uart.xfactoryreset()
121+
uart.flush()
122+
reset_device()
123+
for attempt in range(max_boot_retries):
124+
try:
125+
uart.wait_for_app_boot(timeout=boot_timeout)
126+
break
127+
except AssertionError:
128+
if attempt == max_boot_retries - 1:
129+
raise
130+
reset_device()
131+
uart.end_pretest_phase(reason="setup_complete")
132+
uart.begin_test_run(nodeid)
133+
except Exception:
134+
if uart.is_pretest_phase_active():
135+
uart.end_pretest_phase(reason="setup_failed")
136+
raise
137+
138+
return _prepare_dut
139+
66140
@pytest.fixture(scope="session", autouse=True)
67141
def _purge_pending_fota_jobs():
68142
"""Cancel leftover FOTA jobs queued for the test device before any test runs. """
@@ -80,6 +154,7 @@ def dut_board():
80154
pytest.fail("No UARTs found")
81155
log_uart_string = all_uarts[0]
82156
uart = Uart(log_uart_string, timeout=UART_TIMEOUT)
157+
uart.set_live_output(False)
83158

84159
yield types.SimpleNamespace(
85160
uart=uart,

tests/on_target/tests/test_functional/test_buffer_flash.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import sys
88
import pytest
99
from time import sleep
10-
from utils.flash_tools import flash_device, reset_device
10+
from utils.flash_tools import reset_device
1111

1212
sys.path.append(os.getcwd())
1313
from utils.logger import get_logger
@@ -26,7 +26,7 @@ def get_header_str(datatype):
2626
return "Header file /att_storage/" + datatype + ".header already exists"
2727

2828
@pytest.mark.slow
29-
def test_buffer_flash(dut_cloud, hex_file_buffer_flash):
29+
def test_buffer_flash(dut_cloud, hex_file_buffer_flash, prepare_dut):
3030

3131
# Clear shadow config and command sections before starting the test to ensure a clean slate
3232
# and deterministic behavior.
@@ -39,8 +39,7 @@ def test_buffer_flash(dut_cloud, hex_file_buffer_flash):
3939
storage_threshold=FLASH_BUFFER_TEST_STORAGE_THRESHOLD
4040
)
4141

42-
flash_device(os.path.abspath(hex_file_buffer_flash))
43-
dut_cloud.uart.xfactoryreset()
42+
prepare_dut(dut_cloud, hex_file_buffer_flash)
4443

4544
clear_str = "att_storage clear\r\n"
4645

@@ -57,10 +56,8 @@ def test_buffer_flash(dut_cloud, hex_file_buffer_flash):
5756
]
5857

5958
try:
60-
reset_device()
61-
start_pos = dut_cloud.uart.get_size()
62-
6359
# Wait for storage automount
60+
start_pos = dut_cloud.uart.get_size()
6461
dut_cloud.uart.wait_for_str("Automount /att_storage succeeded", timeout=60, start_pos=start_pos)
6562

6663
# Clear buffer

tests/on_target/tests/test_functional/test_buffer_ram.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import os
77
import sys
88
import pytest
9-
from utils.flash_tools import flash_device, reset_device
9+
from utils.flash_tools import reset_device
1010

1111
sys.path.append(os.getcwd())
1212
from utils.logger import get_logger
@@ -26,7 +26,7 @@ def get_storing_str(datatype):
2626
return f"Stored {datatype} item, count: 1"
2727

2828
@pytest.mark.slow
29-
def test_buffer_ram(dut_cloud, hex_file_buffer_ram):
29+
def test_buffer_ram(dut_cloud, hex_file_buffer_ram, prepare_dut):
3030

3131
# Clear shadow config and command sections before starting the test to ensure a clean slate
3232
# and deterministic behavior.
@@ -41,8 +41,7 @@ def test_buffer_ram(dut_cloud, hex_file_buffer_ram):
4141

4242

4343
try:
44-
flash_device(os.path.abspath(hex_file_buffer_ram))
45-
dut_cloud.uart.xfactoryreset()
44+
prepare_dut(dut_cloud, hex_file_buffer_ram)
4645

4746
initialization_list = [
4847
get_initialized_str("BATTERY"),
@@ -56,9 +55,6 @@ def test_buffer_ram(dut_cloud, hex_file_buffer_ram):
5655
get_storing_str("LOCATION")
5756
]
5857

59-
dut_cloud.uart.flush()
60-
reset_device()
61-
6258
# Wait for storage initialization
6359
dut_cloud.uart.wait_for_str(initialization_list, timeout=60)
6460

tests/on_target/tests/test_functional/test_config.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import os
77
import time
8-
from utils.flash_tools import flash_device, reset_device
8+
from utils.flash_tools import reset_device
99
import sys
1010
sys.path.append(os.getcwd())
1111
from utils.logger import get_logger
@@ -59,7 +59,7 @@ def wait_for_config_reported(cloud, device_id, expected_sample, expected_thresho
5959
)
6060

6161

62-
def test_config(dut_cloud, hex_file):
62+
def test_config(dut_cloud, hex_file, prepare_dut):
6363
'''
6464
Test that verifies shadow changes are applied and reported back for all config
6565
parameters. Tests both boot-time config pickup and runtime config updates.
@@ -77,10 +77,7 @@ def test_config(dut_cloud, hex_file):
7777
storage_threshold=BOOT_STORAGE_THRESHOLD
7878
)
7979

80-
flash_device(os.path.abspath(hex_file))
81-
dut_cloud.uart.xfactoryreset()
82-
dut_cloud.uart.flush()
83-
reset_device()
80+
prepare_dut(dut_cloud, hex_file)
8481

8582
dut_cloud.uart.wait_for_str_with_retries(
8683
"Connected to Cloud",

tests/on_target/tests/test_functional/test_fota.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,13 @@ def run_fota_resumption(dut_fota, fota_type):
110110
# First disconnect at 20%
111111
timeout_20_percent = FULL_MFW_FOTA_TIMEOUT * 0.2
112112
dut_fota.uart.wait_for_str("20%", timeout=timeout_20_percent)
113-
logger.info("Performing first disconnect/reconnect at 20%")
113+
logger.info("Performing first disconnect/reconnect at 20 percent")
114114
perform_disconnect_reconnect(dut_fota, 20)
115115

116116
# Second disconnect at 80%
117117
timeout_80_percent = FULL_MFW_FOTA_TIMEOUT * 0.6 # Additional 60% of total timeout
118118
dut_fota.uart.wait_for_str("80%", timeout=timeout_80_percent)
119-
logger.info("Performing second disconnect/reconnect at 80%")
119+
logger.info("Performing second disconnect/reconnect at 80 percent")
120120
perform_disconnect_reconnect(dut_fota, 80)
121121

122122
def run_fota_reschedule(dut_fota, fota_type):
@@ -153,12 +153,9 @@ def run_fota_reschedule(dut_fota, fota_type):
153153
raise AssertionError(f"Fota update not available after {i} attempts")
154154

155155
@pytest.fixture
156-
def run_fota_fixture(dut_fota, hex_file, reschedule=False):
156+
def run_fota_fixture(dut_fota, hex_file, prepare_dut, reschedule=False):
157157
def _run_fota(bundle_id="", fota_type="app", fotatimeout=APP_FOTA_TIMEOUT, new_version=TEST_APP_VERSION, reschedule=False):
158-
flash_device(os.path.abspath(hex_file))
159-
dut_fota.uart.xfactoryreset()
160-
dut_fota.uart.flush()
161-
reset_device()
158+
prepare_dut(dut_fota, hex_file)
162159

163160
dut_fota.uart.wait_for_str_with_retries("Connected to Cloud", max_retries=3, timeout=240, reset_func=reset_device)
164161

tests/on_target/tests/test_functional/test_memfault.py

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import requests
99
import time
1010
import subprocess
11-
from utils.flash_tools import flash_device, reset_device
11+
from utils.flash_tools import reset_device
1212
from utils.logger import get_logger
1313
from datetime import datetime, timezone
1414

@@ -26,7 +26,7 @@
2626
def convert_binary_trace_to_pcap(binary_trace, pcapng_file):
2727
logger.info(f"Converting modem trace to pcap")
2828
try:
29-
result = subprocess.run(
29+
subprocess.run(
3030
['nrfutil', 'trace', 'lte', '--input-file', binary_trace, '--output-pcapng', pcapng_file],
3131
check=True,
3232
text=True,
@@ -64,7 +64,7 @@ def fetch_recent_modem_trace(device_id, start_time, end_time):
6464
reason = recording.get("reason")
6565

6666
if reason == "modem traces":
67-
return recording.get("id")
67+
return recording.get("id")
6868

6969
return None
7070

@@ -89,17 +89,14 @@ def timestamp(event):
8989
)
9090

9191
@pytest.mark.slow
92-
def test_memfault(dut_board, debug_hex_file):
92+
def test_memfault(dut_board, debug_hex_file, prepare_dut):
9393
# Save timestamp of latest coredump
9494
coredumps = get_latest_coredump_traces(UUID)
9595
logger.debug(f"Found coredumps: {coredumps}")
9696
timestamp_old_coredump = timestamp(coredumps[0]) if coredumps else None
9797
logger.debug(f"Timestamp old coredump: {timestamp_old_coredump}")
9898

99-
flash_device(os.path.abspath(debug_hex_file))
100-
dut_board.uart.xfactoryreset()
101-
dut_board.uart.flush()
102-
reset_device()
99+
prepare_dut(dut_board, debug_hex_file)
103100

104101
dut_board.uart.wait_for_str_with_retries("Connected to Cloud", max_retries=3, timeout=240, reset_func=reset_device)
105102

@@ -115,19 +112,19 @@ def test_memfault(dut_board, debug_hex_file):
115112
# Wait for upload to be reported to memfault api
116113
start = time.time()
117114
while time.time() - start < MEMFAULT_TIMEOUT:
118-
time.sleep(5)
119-
coredumps = get_latest_coredump_traces(UUID)
120-
logger.debug(f"Found coredumps: {coredumps}")
121-
timestamp_new_coredump = timestamp(coredumps[0]) if coredumps else None
122-
logger.debug(f"Timestamp new coredump: {timestamp_new_coredump}")
123-
124-
if not coredumps:
125-
continue
126-
# Check that we have an upload with newer timestamp
127-
if not timestamp_old_coredump:
128-
break
129-
if timestamp(coredumps[0]) > timestamp_old_coredump:
130-
break
115+
time.sleep(5)
116+
coredumps = get_latest_coredump_traces(UUID)
117+
logger.debug(f"Found coredumps: {coredumps}")
118+
timestamp_new_coredump = timestamp(coredumps[0]) if coredumps else None
119+
logger.debug(f"Timestamp new coredump: {timestamp_new_coredump}")
120+
121+
if not coredumps:
122+
continue
123+
# Check that we have an upload with newer timestamp
124+
if not timestamp_old_coredump:
125+
break
126+
if timestamp(coredumps[0]) > timestamp_old_coredump:
127+
break
131128
else:
132129
raise RuntimeError("No new coredump observed")
133130

tests/on_target/tests/test_functional/test_mqtt.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
##########################################################################################
55

66
import os
7-
from utils.flash_tools import flash_device, reset_device
7+
from utils.flash_tools import reset_device
88
import sys
99
import pytest
1010

@@ -14,25 +14,20 @@
1414
logger = get_logger()
1515

1616
@pytest.mark.slow
17-
def test_mqtt_firmware(dut_board, hex_file_mqtt):
17+
def test_mqtt_firmware(dut_board, hex_file_mqtt, prepare_dut):
1818
"""Test the firmware with cloud MQTT module."""
1919
# Only run this test for thingy91x devices
2020
devicetype = os.getenv("DUT_DEVICE_TYPE")
2121
if devicetype != "thingy91x":
2222
pytest.skip("This test is only for thingy91x devices")
2323

24-
# Flash the firmware
25-
flash_device(os.path.abspath(hex_file_mqtt))
26-
dut_board.uart.xfactoryreset()
24+
# Flash and initialize the firmware
25+
prepare_dut(dut_board, hex_file_mqtt)
2726

2827
# Log patterns to check
2928
pattern_connect_to_broker = "network: lte_lc_evt_handler: PDN connection activated"
3029
pattern_cloud = "cloud: on_mqtt_connack: MQTT connection established, session present: 0"
3130

32-
# Cloud connection
33-
dut_board.uart.flush()
34-
reset_device()
35-
3631
dut_board.uart.wait_for_str_with_retries(pattern_cloud, max_retries=3, timeout=240, reset_func=reset_device)
3732

3833
# Wait for shadow request

0 commit comments

Comments
 (0)