Skip to content

Commit fa301f6

Browse files
committed
tests/run-tests: Add itest_ framework for interactive PTY tests.
This adds support for itest_* test files that require CPython to control MicroPython via a PTY with signal generation enabled. The framework handles all PTY setup, terminal configuration (ISIG, controlling terminal), subprocess spawning, and cleanup. Test files contain only test-specific logic and are exec'd with the master PTY fd available in their global scope. This allows testing terminal features like Ctrl-C interrupt handling that require signal generation and cannot be tested via the existing repl_ framework (which sends complete lines and cannot interrupt mid-execution). Signed-off-by: Andrew Leech <[email protected]>
1 parent 3551ee7 commit fa301f6

File tree

1 file changed

+136
-14
lines changed

1 file changed

+136
-14
lines changed

tests/run-tests.py

Lines changed: 136 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,10 @@ def run_micropython(pyb, args, test_file, test_file_abspath, is_special=False):
604604
# special handling for tests of the unix cmdline program
605605
is_special = True
606606

607+
# Interactive tests (itest_*) also need special handling
608+
if os.path.basename(test_file).startswith("itest_"):
609+
is_special = True
610+
607611
if is_special:
608612
# check for any cmdline options needed for this test
609613
args = [MICROPYTHON]
@@ -615,7 +619,90 @@ def run_micropython(pyb, args, test_file, test_file_abspath, is_special=False):
615619

616620
# run the test, possibly with redirected input
617621
try:
618-
if os.path.basename(test_file).startswith("repl_"):
622+
if os.path.basename(test_file).startswith("itest_"):
623+
# Interactive test - CPython code that controls MicroPython via PTY
624+
try:
625+
import pty
626+
import termios
627+
import fcntl
628+
except ImportError:
629+
# in case pty/termios module is not available, like on Windows
630+
return b"SKIP\n"
631+
import select
632+
from io import StringIO
633+
634+
# Even though these might have the pty module, it's unlikely to function.
635+
if sys.platform in ["win32", "msys", "cygwin"]:
636+
return b"SKIP\n"
637+
638+
# Set up PTY with ISIG enabled for signal generation
639+
master, slave = pty.openpty()
640+
641+
# Configure terminal attributes to enable Ctrl-C signal generation
642+
attrs = termios.tcgetattr(slave)
643+
attrs[3] |= termios.ISIG # Enable signal generation (ISIG flag)
644+
attrs[6][termios.VINTR] = 3 # Set Ctrl-C (0x03) as interrupt character
645+
termios.tcsetattr(slave, termios.TCSANOW, attrs)
646+
647+
def setup_controlling_terminal():
648+
"""Set up the child process with the PTY as controlling terminal."""
649+
os.setsid() # Create a new session
650+
fcntl.ioctl(0, termios.TIOCSCTTY, 0) # Make PTY the controlling terminal
651+
652+
# Spawn MicroPython with the PTY as its stdin/stdout/stderr
653+
p = subprocess.Popen(
654+
args,
655+
stdin=slave,
656+
stdout=slave,
657+
stderr=subprocess.STDOUT,
658+
bufsize=0,
659+
preexec_fn=setup_controlling_terminal,
660+
)
661+
662+
# Capture stdout while running the test code
663+
# Use lock to prevent other threads from printing while we redirect stdout
664+
with stdout_lock:
665+
sys.stdout.flush() # Flush any buffered output before redirecting
666+
old_stdout = sys.stdout
667+
sys.stdout = StringIO()
668+
669+
try:
670+
# Execute test file with utilities available in globals:
671+
# - master: PTY master file descriptor
672+
# - read_until_quiet: Function to read from PTY until no data for timeout seconds
673+
test_globals = {
674+
"master": master,
675+
"read_until_quiet": pty_read_until_quiet,
676+
}
677+
with open(test_file_abspath, "rb") as f:
678+
exec(compile(f.read(), test_file_abspath, "exec"), test_globals)
679+
output_mupy = sys.stdout.getvalue().encode("utf-8")
680+
except SystemExit:
681+
# Test requested to exit (e.g., after printing SKIP)
682+
output_mupy = sys.stdout.getvalue().encode("utf-8")
683+
except Exception as e:
684+
output_mupy = f"CRASH: {e}\n".encode("utf-8")
685+
finally:
686+
sys.stdout = old_stdout
687+
688+
# Clean up: send Ctrl-D to exit REPL and close PTY
689+
try:
690+
os.write(master, b"\x04") # Ctrl-D to exit
691+
except OSError:
692+
pass # Process may have already exited
693+
694+
try:
695+
p.wait(timeout=1)
696+
except subprocess.TimeoutExpired:
697+
p.kill()
698+
p.wait()
699+
except ProcessLookupError:
700+
pass
701+
702+
os.close(master)
703+
os.close(slave)
704+
705+
elif os.path.basename(test_file).startswith("repl_"):
619706
# Need to use a PTY to test command line editing
620707
try:
621708
import pty
@@ -629,14 +716,8 @@ def run_micropython(pyb, args, test_file, test_file_abspath, is_special=False):
629716
return b"SKIP\n"
630717

631718
def get(required=False):
632-
rv = b""
633-
while True:
634-
ready = select.select([master], [], [], 0.02)
635-
if ready[0] == [master]:
636-
rv += os.read(master, 1024)
637-
else:
638-
if not required or rv:
639-
return rv
719+
# Use the unified pty_read_until_quiet function
720+
return pty_read_until_quiet(master, timeout=0.02, required=required)
640721

641722
def send_get(what):
642723
# Detect {\x00} pattern and convert to ctrl-key codes.
@@ -808,6 +889,42 @@ def value(self):
808889
return self._value
809890

810891

892+
# Global lock to protect stdout operations when running tests in parallel
893+
stdout_lock = threading.Lock()
894+
895+
896+
def pty_read_until_quiet(master, timeout=0.05, required=False):
897+
"""Read from PTY until no data for timeout seconds.
898+
899+
This is a unified utility function used by both repl_ and itest_ tests.
900+
901+
Args:
902+
master: PTY master file descriptor
903+
timeout: Seconds to wait for data before considering the stream quiet (default 0.05)
904+
required: If True, keep trying until at least some data is read (default False)
905+
906+
Returns:
907+
bytes: Data read from PTY
908+
"""
909+
import select
910+
import os
911+
912+
output = b""
913+
while True:
914+
ready = select.select([master], [], [], timeout)
915+
if ready[0]:
916+
data = os.read(master, 1024)
917+
if not data:
918+
# EOF reached
919+
break
920+
output += data
921+
else:
922+
# Timeout with no data available
923+
if not required or output:
924+
break
925+
return output
926+
927+
811928
class PyboardNodeRunner:
812929
def __init__(self):
813930
mjs = os.getenv("MICROPY_MICROPYTHON_MJS")
@@ -1098,7 +1215,8 @@ def run_one_test(test_file):
10981215
skip_it |= skip_inlineasm and is_inlineasm
10991216

11001217
if skip_it:
1101-
print("skip ", test_file)
1218+
with stdout_lock:
1219+
print("skip ", test_file)
11021220
test_results.append((test_file, "skip", ""))
11031221
return
11041222

@@ -1113,11 +1231,13 @@ def run_one_test(test_file):
11131231
# reset. Wait for the soft reset to finish, so we don't interrupt the
11141232
# start-up code (eg boot.py) when preparing to run the next test.
11151233
pyb.read_until(1, b"raw REPL; CTRL-B to exit\r\n")
1116-
print("skip ", test_file)
1234+
with stdout_lock:
1235+
print("skip ", test_file)
11171236
test_results.append((test_file, "skip", ""))
11181237
return
11191238
elif output_mupy == b"SKIP-TOO-LARGE\n":
1120-
print("lrge ", test_file)
1239+
with stdout_lock:
1240+
print("lrge ", test_file)
11211241
test_results.append((test_file, "skip", "too large"))
11221242
return
11231243

@@ -1204,12 +1324,14 @@ def run_one_test(test_file):
12041324

12051325
# Print test summary, update counters, and save .exp/.out files if needed.
12061326
if test_passed:
1207-
print("pass ", test_file, extra_info)
1327+
with stdout_lock:
1328+
print("pass ", test_file, extra_info)
12081329
test_results.append((test_file, "pass", ""))
12091330
rm_f(filename_expected)
12101331
rm_f(filename_mupy)
12111332
else:
1212-
print("FAIL ", test_file, extra_info)
1333+
with stdout_lock:
1334+
print("FAIL ", test_file, extra_info)
12131335
if output_expected is not None:
12141336
with open(filename_expected, "wb") as f:
12151337
f.write(output_expected)

0 commit comments

Comments
 (0)