This repository was archived by the owner on Nov 25, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathcmd.py
126 lines (109 loc) · 3.55 KB
/
cmd.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/python3
# -*- coding: UTF-8 -*-
import sys
import time
import subprocess
import unittest
sys.path.append("..")
from core.global_config import logger, MAX_TIMEOUT_SECOND # noqa
def run_cmd(cmd, wait_interval_sec=5, max_timeout_sec=MAX_TIMEOUT_SECOND):
cmd_type = "CMD"
logger.info("{}> {}".format(cmd_type, cmd))
subp = subprocess.Popen(
cmd,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)
try:
subp.wait(max_timeout_sec)
except subprocess.TimeoutExpired:
logger.error(
"TimeoutExpired {} seconds: {}, let's kill this subprocess".format(
max_timeout_sec, cmd
) # noqa
)
subp.kill()
return None
subp_status = -1
duration_sec = 0
while 1:
if subp.poll() == 0:
logger.debug("{} finished".format(cmd_type))
subp_status = int(subp.poll())
break
elif subp.poll() is None:
logger.debug(
"{} duration += {} second".format(cmd_type, wait_interval_sec) # noqa
) # noqa
time.sleep(wait_interval_sec)
duration_sec += wait_interval_sec
if duration_sec > max_timeout_sec:
logger.error(
"{} duration {} second timeout with max_timeout_sec {}".format( # noqa
cmd_type, duration_sec, max_timeout_sec
)
)
subp.kill()
break
else:
subp_status = subp.poll()
logger.fatal(
"exited with abnormal subprocess status: {}".format( # noqa
subp_status
) # noqa
)
if subp_status == 139:
break
else:
exit(1)
logger.debug(
"{} consume {} seconds to finish".format(cmd_type, duration_sec) # noqa
) # noqa
cmd_res = None
if subp_status == 0 or subp_status == 139:
cmd_res = "".join(subp.communicate())
logger.debug("cmd_res:{}".format(subp.communicate()))
cmd_res = cmd_res.split("\n")
cmd_res = filter(lambda r: r != "", cmd_res)
cmd_res = list(cmd_res)
return cmd_res
def run_cmds(cmds, interval_second=0, wait_timeout_second=60):
cmd_res_dict = dict()
cmds = list(cmds)
for cidx in range(len(cmds)):
cmd = cmds[cidx]
res = run_cmd(cmd, interval_second, wait_timeout_second)
cmd_res_dict[cmd] = res
return cmd_res_dict
class TestCmd(unittest.TestCase):
def setUp(self):
logger.info(
"{} {}".format(
self.__class__.__name__, sys._getframe().f_code.co_name # noqa
) # noqa
)
def tearDown(self):
logger.info(
"{} {}".format(
self.__class__.__name__, sys._getframe().f_code.co_name # noqa
) # noqa
)
def test_run_cmds(self):
cmds = ["ls", "ls -l", "adb devices"]
cmd_handls = run_cmds(cmds)
for cidx in range(len(cmds)):
cmd = cmds[cidx]
h = cmd_handls[cmd]
logger.info("cmd_idx:{}, cmd:{}".format(cidx, cmd))
logger.info(h)
def test_run_cmd(self):
cmds = ["ls", "pwd", "adb devices"]
for cidx in range(len(cmds)):
cmd = cmds[cidx]
h = run_cmd(cmd)
logger.info("cmd_idx:{}, cmd:{}".format(cidx, cmd))
logger.info(h)
if __name__ == "__main__":
unittest.main()