-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathappium_manager.py
More file actions
259 lines (229 loc) · 10.3 KB
/
appium_manager.py
File metadata and controls
259 lines (229 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
import subprocess
import threading
import time
import os
import signal
from typing import Dict, List, Optional
from utils import get_logger
class AppiumInstance:
def __init__(self, port: int, host: str = "127.0.0.1", node_options: Optional[str] = None,
use_jitless: bool = False, extra_args: Optional[List[str]] = None):
self.port = int(port)
self.host = host
self.node_options = node_options
self.use_jitless = use_jitless
self.extra_args = extra_args or []
self.process: Optional[subprocess.Popen] = None
self.last_start_time = 0.0
self.restart_count = 0
self.alive = False
self.last_exit_code: Optional[int] = None
self.logger = get_logger(app_name=f"APK_Manager_{self.port}")
def is_starting(self, startup_grace=12) -> bool:
if not self.process or self.process.poll() is not None:
return False
return (time.time() - self.last_start_time) < float(startup_grace)
def start(self):
if self.process and self.process.poll() is None:
self.logger.info(f"Appium({self.port}) is already running")
self.alive = True
return
env = os.environ.copy()
if self.node_options:
env['NODE_OPTIONS'] = self.node_options
elif self.use_jitless:
env['NODE_OPTIONS'] = env.get('NODE_OPTIONS', '') + ' --jitless'
cmd = ["appium", "-p", str(self.port), "--log-level", "info"] + self.extra_args
self.logger.info(f"Starting Appium: {' '.join(cmd)} NODE_OPTIONS={env.get('NODE_OPTIONS','')}")
self.process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, env=env)
self.last_start_time = time.time()
self.last_exit_code = None
self.alive = True
self.logger.info(f"Appium({self.port}) started successfully pid={self.process.pid}")
def stop(self, sig=signal.SIGTERM, timeout=5):
if not self.process:
return
if self.process.poll() is None:
try:
self.logger.info(f"Stopping Appium({self.port}) ...")
self.process.send_signal(sig)
try:
self.process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
self.logger.warning(f"Appium({self.port}) is being forcefully killed")
self.process.kill()
except Exception as e:
self.logger.warning(f"Error stopping Appium({self.port}): {e}")
if self.process:
self.last_exit_code = self.process.poll()
self.alive = False
self.process = None
def is_healthy(self, timeout=2) -> bool:
if not self.process or self.process.poll() is not None:
return False
import http.client
for path in ("/status", "/wd/hub/status"):
try:
conn = http.client.HTTPConnection(self.host, self.port, timeout=timeout)
conn.request("GET", path)
resp = conn.getresponse()
healthy = resp.status == 200
conn.close()
if healthy:
return True
except Exception:
continue
return False
def ensure_running(self, restart_backoff=5, max_restart=5, startup_grace=12):
if self.process and self.process.poll() is not None:
exit_code = self.process.poll()
if self.alive:
self.logger.error(f"Appium({self.port}) process has exited, exit_code={exit_code}")
self.last_exit_code = exit_code
self.alive = False
if self.is_healthy():
return True
# Process is still alive and within startup grace period: don't restart, wait for it to be ready
if self.is_starting(startup_grace=startup_grace):
return False
now = time.time()
if now - self.last_start_time < restart_backoff:
return False
if self.restart_count >= max_restart:
if self.alive:
self.logger.error(f"Appium({self.port}) reached max restart count {max_restart}")
return False
self.logger.warning(f"Appium({self.port}) is unhealthy, attempting restart (count={self.restart_count})")
self.stop()
time.sleep(1)
self.start()
self.restart_count += 1
self.logger.info(f"Appium({self.port}) restart completed, total restart count={self.restart_count}")
return True
def stream_logs(self):
if not self.process or not self.process.stdout:
return
for line in self.process.stdout:
if not line:
break
if 'HTTP' in line or 'ERROR' in line or 'Appium' in line:
self.logger.debug(line.rstrip())
class AppiumManager:
def __init__(self, ports: List[int] | List[str], host: str = "127.0.0.1",
node_options: Optional[str] = None, use_jitless: bool = False,
extra_args: Optional[List[str]] = None, auto_start=True,
health_interval=10):
self.instances: Dict[int, AppiumInstance] = {}
self.host = host
self.node_options = node_options
self.use_jitless = use_jitless
self.extra_args = extra_args or []
self.health_interval = health_interval
self.logger = get_logger(app_name="APK_Manager")
for p in ports:
inst = AppiumInstance(int(p), host, node_options, use_jitless, self.extra_args)
self.instances[int(p)] = inst
self._stop_flag = threading.Event()
self._health_thread: Optional[threading.Thread] = None
if auto_start:
self.start_all()
self.start_health_monitor()
def start_all(self):
for inst in self.instances.values():
try:
inst.start()
except Exception as e:
self.logger.error(f"Failed to start Appium({inst.port}): {e}")
def wait_until_healthy(self, timeout=60, poll_interval=1.0, startup_grace=12) -> bool:
"""
Block waiting for all Appium instances to respond healthy on /status.
If a process exits during waiting, attempt to restart it immediately.
"""
deadline = time.time() + max(1, int(timeout))
while time.time() < deadline:
all_ok = True
for _, inst in self.instances.items():
try:
if inst.is_healthy():
continue
all_ok = False
# If process is still within startup grace period, just wait; restart only after exit
inst.ensure_running(restart_backoff=5, max_restart=10, startup_grace=startup_grace)
except Exception as e:
all_ok = False
self.logger.warning(f"Failed health check while waiting port={inst.port}: {e}")
if all_ok:
self.logger.info(f"All Appium instances are ready: {self.summary()}")
return True
time.sleep(float(poll_interval))
self.logger.error(f"Timeout waiting for Appium to be ready ({timeout}s): {self.summary()}")
return False
def recover_unhealthy(self, max_restart=10, startup_grace=12) -> dict:
"""
Immediately attempt to recover unhealthy instances and return latest summary.
"""
for _, inst in self.instances.items():
try:
if not inst.is_healthy():
inst.ensure_running(restart_backoff=5, max_restart=max_restart, startup_grace=startup_grace)
except Exception as e:
self.logger.warning(f"Failed to recover unhealthy instance port={inst.port}: {e}")
return self.summary()
def start_health_monitor(self):
if self._health_thread and self._health_thread.is_alive():
return
def loop():
self.logger.info("Appium health monitor thread started")
tick = 0
summary_every = max(1, int(60 / max(1, int(self.health_interval))))
while not self._stop_flag.is_set():
for port, inst in self.instances.items():
try:
inst.ensure_running()
except Exception as e:
self.logger.error(f"Health check exception port={port}: {e}")
tick += 1
if tick % summary_every == 0:
try:
if self.any_unhealthy():
self.logger.warning(f"Appium health summary (with issues): {self.summary()}")
except Exception as e:
self.logger.warning(f"Failed to record Appium health summary: {e}")
time.sleep(self.health_interval)
self.logger.info("Appium health monitor thread ended")
self._health_thread = threading.Thread(target=loop, name="AppiumHealth", daemon=True)
self._health_thread.start()
def get_ports(self) -> List[str]:
return [str(p) for p in self.instances.keys()]
def shutdown_all(self):
self._stop_flag.set()
if self._health_thread and self._health_thread.is_alive():
self._health_thread.join(timeout=3)
for inst in self.instances.values():
try:
inst.stop()
except Exception:
pass
self.logger.info("All Appium instances have been shut down")
def any_unhealthy(self) -> bool:
for inst in self.instances.values():
if not inst.is_healthy():
return True
return False
def summary(self) -> dict:
return {
p: {
"pid": (inst.process.pid if inst.process else None),
"alive": bool(inst.process and inst.process.poll() is None),
"healthy": inst.is_healthy(),
"restart_count": inst.restart_count,
"last_exit_code": inst.last_exit_code,
}
for p, inst in self.instances.items()
}
_global_manager: Optional[AppiumManager] = None
def get_appium_manager(ports: List[int] | List[str], **kwargs) -> AppiumManager:
global _global_manager
if _global_manager is None:
_global_manager = AppiumManager(ports, **kwargs)
return _global_manager