2
2
# SPDX-License-Identifier: LGPL-2.1-or-later
3
3
4
4
5
- import subprocess
5
+ import os
6
6
import sys
7
7
import tempfile
8
+ import types
8
9
10
+ import drgn .cli
9
11
from tests import TestCase
10
12
11
13
12
14
class TestCli (TestCase ):
15
+ def run_cli (self , args , * , input = None ):
16
+ stdout_r , stdout_w = os .pipe ()
17
+ stderr_r , stderr_w = os .pipe ()
18
+ if input is not None :
19
+ stdin_r , stdin_w = os .pipe ()
13
20
14
- def run_cli (self , * args : str , ** kwargs ):
15
- try :
16
- return subprocess .run (
17
- [sys .executable , "-m" , "drgn" ] + list (args ),
18
- stdout = subprocess .PIPE ,
19
- stderr = subprocess .PIPE ,
20
- check = True ,
21
- ** kwargs ,
21
+ pid = os .fork ()
22
+ if pid == 0 :
23
+ os .close (stdout_r )
24
+ sys .stdout = open (stdout_w , "w" )
25
+ os .close (stderr_r )
26
+ sys .stderr = open (stderr_w , "w" )
27
+
28
+ if input is not None :
29
+ os .close (stdin_w )
30
+ sys .stdin = open (stdin_r , "r" )
31
+
32
+ sys .argv = ["drgn" ] + args
33
+
34
+ drgn .cli ._main ()
35
+
36
+ sys .stdout .flush ()
37
+ sys .stderr .flush ()
38
+ os ._exit (0 )
39
+
40
+ os .close (stdout_w )
41
+ os .close (stderr_w )
42
+
43
+ if input is not None :
44
+ os .close (stdin_r )
45
+ with open (stdin_w , "w" ) as f :
46
+ f .write (input )
47
+
48
+ with open (stdout_r , "r" ) as f :
49
+ stdout = f .read ()
50
+ with open (stderr_r , "r" ) as f :
51
+ stderr = f .read ()
52
+
53
+ _ , wstatus = os .waitpid (pid , 0 )
54
+ if not os .WIFEXITED (wstatus ) or os .WEXITSTATUS (wstatus ) != 0 :
55
+ if os .WIFEXITED (wstatus ):
56
+ msg = f"Exited with status { os .WEXITSTATUS (wstatus )} "
57
+ elif os .WIFSIGNALED (wstatus ):
58
+ msg = f"Terminated by signal { os .WTERMSIG (wstatus )} "
59
+ else :
60
+ msg = "Exited abnormally"
61
+ self .fail (
62
+ f"""\
63
+ { msg }
64
+ STDOUT:
65
+ { stdout .decode ()}
66
+ STDERR:
67
+ { stderr .decode ()}
68
+ """
22
69
)
23
- except subprocess .CalledProcessError as e :
24
- # With captured output, there's nothing left to debug in CI logs.
25
- # Print output on a failure so we can debug.
26
- print (f"STDOUT:\n { e .stdout .decode ()} " )
27
- print (f"STDERR:\n { e .stderr .decode ()} " )
28
- raise
70
+
71
+ return types .SimpleNamespace (stdout = stdout , stderr = stderr )
29
72
30
73
def test_e (self ):
31
74
script = r"""
@@ -38,9 +81,9 @@ def test_e(self):
38
81
print(sys.argv)
39
82
"""
40
83
proc = self .run_cli (
41
- "--quiet" , "--pid" , "0" , "--no-default-symbols" , "-e" , script , "pass"
84
+ [ "--quiet" , "--pid" , "0" , "--no-default-symbols" , "-e" , script , "pass" ]
42
85
)
43
- self .assertEqual (proc .stdout , b "['-e', 'pass']\n " )
86
+ self .assertEqual (proc .stdout , "['-e', 'pass']\n " )
44
87
45
88
def test_script (self ):
46
89
with tempfile .NamedTemporaryFile () as f :
@@ -61,12 +104,12 @@ def test_script(self):
61
104
)
62
105
f .flush ()
63
106
proc = self .run_cli (
64
- "--quiet" , "--pid" , "0" , "--no-default-symbols" , f .name , "pass"
107
+ [ "--quiet" , "--pid" , "0" , "--no-default-symbols" , f .name , "pass" ]
65
108
)
66
- self .assertEqual (proc .stdout , f"[{ f .name !r} , 'pass']\n " . encode () )
109
+ self .assertEqual (proc .stdout , f"[{ f .name !r} , 'pass']\n " )
67
110
68
111
def test_pipe (self ):
69
- script = rb """
112
+ script = r """
70
113
import sys
71
114
72
115
assert drgn.get_default_prog() is prog
@@ -78,6 +121,6 @@ def test_pipe(self):
78
121
print(sys.argv)
79
122
"""
80
123
proc = self .run_cli (
81
- "--quiet" , "--pid" , "0" , "--no-default-symbols" , input = script
124
+ [ "--quiet" , "--pid" , "0" , "--no-default-symbols" ] , input = script
82
125
)
83
- self .assertEqual (proc .stdout , b "['']\n " )
126
+ self .assertEqual (proc .stdout , "['']\n " )
0 commit comments