Skip to content

Commit 27ae8aa

Browse files
committed
tests/run-tests: Add itest_ framework for interactive PTY tests.
This adds support for itest_* test files that require CPython to control MicroPython via a PTY with signal generation enabled. The framework handles all PTY setup, terminal configuration (ISIG, controlling terminal), subprocess spawning, and cleanup. Test files contain only test-specific logic and are exec'd with the master PTY fd available in their global scope. This allows testing terminal features like Ctrl-C interrupt handling that require signal generation and cannot be tested via the existing repl_ framework (which sends complete lines and cannot interrupt mid-execution). Signed-off-by: Andrew Leech <[email protected]>
1 parent e0a9b70 commit 27ae8aa

File tree

1 file changed

+136
-14
lines changed

1 file changed

+136
-14
lines changed

tests/run-tests.py

Lines changed: 136 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,10 @@ def run_micropython(pyb, args, test_file, test_file_abspath, is_special=False):
587587
# special handling for tests of the unix cmdline program
588588
is_special = True
589589

590+
# Interactive tests (itest_*) also need special handling
591+
if os.path.basename(test_file).startswith("itest_"):
592+
is_special = True
593+
590594
if is_special:
591595
# check for any cmdline options needed for this test
592596
args = [MICROPYTHON]
@@ -598,7 +602,90 @@ def run_micropython(pyb, args, test_file, test_file_abspath, is_special=False):
598602

599603
# run the test, possibly with redirected input
600604
try:
601-
if os.path.basename(test_file).startswith("repl_"):
605+
if os.path.basename(test_file).startswith("itest_"):
606+
# Interactive test - CPython code that controls MicroPython via PTY
607+
try:
608+
import pty
609+
import termios
610+
import fcntl
611+
except ImportError:
612+
# in case pty/termios module is not available, like on Windows
613+
return b"SKIP\n"
614+
import select
615+
from io import StringIO
616+
617+
# Even though these might have the pty module, it's unlikely to function.
618+
if sys.platform in ["win32", "msys", "cygwin"]:
619+
return b"SKIP\n"
620+
621+
# Set up PTY with ISIG enabled for signal generation
622+
master, slave = pty.openpty()
623+
624+
# Configure terminal attributes to enable Ctrl-C signal generation
625+
attrs = termios.tcgetattr(slave)
626+
attrs[3] |= termios.ISIG # Enable signal generation (ISIG flag)
627+
attrs[6][termios.VINTR] = 3 # Set Ctrl-C (0x03) as interrupt character
628+
termios.tcsetattr(slave, termios.TCSANOW, attrs)
629+
630+
def setup_controlling_terminal():
631+
"""Set up the child process with the PTY as controlling terminal."""
632+
os.setsid() # Create a new session
633+
fcntl.ioctl(0, termios.TIOCSCTTY, 0) # Make PTY the controlling terminal
634+
635+
# Spawn MicroPython with the PTY as its stdin/stdout/stderr
636+
p = subprocess.Popen(
637+
args,
638+
stdin=slave,
639+
stdout=slave,
640+
stderr=subprocess.STDOUT,
641+
bufsize=0,
642+
preexec_fn=setup_controlling_terminal,
643+
)
644+
645+
# Capture stdout while running the test code
646+
# Use lock to prevent other threads from printing while we redirect stdout
647+
with stdout_lock:
648+
sys.stdout.flush() # Flush any buffered output before redirecting
649+
old_stdout = sys.stdout
650+
sys.stdout = StringIO()
651+
652+
try:
653+
# Execute test file with utilities available in globals:
654+
# - master: PTY master file descriptor
655+
# - read_until_quiet: Function to read from PTY until no data for timeout seconds
656+
test_globals = {
657+
"master": master,
658+
"read_until_quiet": pty_read_until_quiet,
659+
}
660+
with open(test_file_abspath, "rb") as f:
661+
exec(compile(f.read(), test_file_abspath, "exec"), test_globals)
662+
output_mupy = sys.stdout.getvalue().encode("utf-8")
663+
except SystemExit:
664+
# Test requested to exit (e.g., after printing SKIP)
665+
output_mupy = sys.stdout.getvalue().encode("utf-8")
666+
except Exception as e:
667+
output_mupy = f"CRASH: {e}\n".encode("utf-8")
668+
finally:
669+
sys.stdout = old_stdout
670+
671+
# Clean up: send Ctrl-D to exit REPL and close PTY
672+
try:
673+
os.write(master, b"\x04") # Ctrl-D to exit
674+
except OSError:
675+
pass # Process may have already exited
676+
677+
try:
678+
p.wait(timeout=1)
679+
except subprocess.TimeoutExpired:
680+
p.kill()
681+
p.wait()
682+
except ProcessLookupError:
683+
pass
684+
685+
os.close(master)
686+
os.close(slave)
687+
688+
elif os.path.basename(test_file).startswith("repl_"):
602689
# Need to use a PTY to test command line editing
603690
try:
604691
import pty
@@ -612,14 +699,8 @@ def run_micropython(pyb, args, test_file, test_file_abspath, is_special=False):
612699
return b"SKIP\n"
613700

614701
def get(required=False):
615-
rv = b""
616-
while True:
617-
ready = select.select([master], [], [], 0.02)
618-
if ready[0] == [master]:
619-
rv += os.read(master, 1024)
620-
else:
621-
if not required or rv:
622-
return rv
702+
# Use the unified pty_read_until_quiet function
703+
return pty_read_until_quiet(master, timeout=0.02, required=required)
623704

624705
def send_get(what):
625706
# Detect {\x00} pattern and convert to ctrl-key codes.
@@ -791,6 +872,42 @@ def value(self):
791872
return self._value
792873

793874

875+
# Global lock to protect stdout operations when running tests in parallel
876+
stdout_lock = threading.Lock()
877+
878+
879+
def pty_read_until_quiet(master, timeout=0.05, required=False):
880+
"""Read from PTY until no data for timeout seconds.
881+
882+
This is a unified utility function used by both repl_ and itest_ tests.
883+
884+
Args:
885+
master: PTY master file descriptor
886+
timeout: Seconds to wait for data before considering the stream quiet (default 0.05)
887+
required: If True, keep trying until at least some data is read (default False)
888+
889+
Returns:
890+
bytes: Data read from PTY
891+
"""
892+
import select
893+
import os
894+
895+
output = b""
896+
while True:
897+
ready = select.select([master], [], [], timeout)
898+
if ready[0]:
899+
data = os.read(master, 1024)
900+
if not data:
901+
# EOF reached
902+
break
903+
output += data
904+
else:
905+
# Timeout with no data available
906+
if not required or output:
907+
break
908+
return output
909+
910+
794911
class PyboardNodeRunner:
795912
def __init__(self):
796913
mjs = os.getenv("MICROPY_MICROPYTHON_MJS")
@@ -1081,7 +1198,8 @@ def run_one_test(test_file):
10811198
skip_it |= skip_inlineasm and is_inlineasm
10821199

10831200
if skip_it:
1084-
print("skip ", test_file)
1201+
with stdout_lock:
1202+
print("skip ", test_file)
10851203
test_results.append((test_file, "skip", ""))
10861204
return
10871205

@@ -1096,11 +1214,13 @@ def run_one_test(test_file):
10961214
# reset. Wait for the soft reset to finish, so we don't interrupt the
10971215
# start-up code (eg boot.py) when preparing to run the next test.
10981216
pyb.read_until(1, b"raw REPL; CTRL-B to exit\r\n")
1099-
print("skip ", test_file)
1217+
with stdout_lock:
1218+
print("skip ", test_file)
11001219
test_results.append((test_file, "skip", ""))
11011220
return
11021221
elif output_mupy == b"SKIP-TOO-LARGE\n":
1103-
print("lrge ", test_file)
1222+
with stdout_lock:
1223+
print("lrge ", test_file)
11041224
test_results.append((test_file, "skip", "too large"))
11051225
return
11061226

@@ -1187,12 +1307,14 @@ def run_one_test(test_file):
11871307

11881308
# Print test summary, update counters, and save .exp/.out files if needed.
11891309
if test_passed:
1190-
print("pass ", test_file, extra_info)
1310+
with stdout_lock:
1311+
print("pass ", test_file, extra_info)
11911312
test_results.append((test_file, "pass", ""))
11921313
rm_f(filename_expected)
11931314
rm_f(filename_mupy)
11941315
else:
1195-
print("FAIL ", test_file, extra_info)
1316+
with stdout_lock:
1317+
print("FAIL ", test_file, extra_info)
11961318
if output_expected is not None:
11971319
with open(filename_expected, "wb") as f:
11981320
f.write(output_expected)

0 commit comments

Comments
 (0)