-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathsupervisor.py
More file actions
102 lines (81 loc) · 3.15 KB
/
supervisor.py
File metadata and controls
102 lines (81 loc) · 3.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#!/usr/bin/env python3
import asyncio
import subprocess
import sys
import time
import logging
import os
from PetkitW5BLEMQTT import Constants, BLEManager
from bleak import BleakScanner
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - PetkitW5BLEMQTT-Supervisor - %(levelname)s - %(message)s"
)
logger = logging.getLogger("PetkitSupervisor")
# Resolve absolute path to main.py
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MAIN_PATH = os.path.join(BASE_DIR, "main.py")
def start_worker(address, mqtt_args):
"""Start a worker process running main.py for a specific device."""
cmd = [
sys.executable,
MAIN_PATH, # <-- FIXED PATH
"--address", address,
"--logging_level", "INFO"
]
if mqtt_args:
cmd.append("--mqtt")
if mqtt_args.get("broker"):
cmd += ["--mqtt_broker", mqtt_args["broker"]]
if mqtt_args.get("port"):
cmd += ["--mqtt_port", str(mqtt_args["port"])]
if mqtt_args.get("username"):
cmd += ["--mqtt_user", mqtt_args["username"]]
if mqtt_args.get("password"):
cmd += ["--mqtt_password", mqtt_args["password"]]
logger.info(f"Launching worker for {address}: {' '.join(cmd)}")
return subprocess.Popen(cmd)
async def supervisor(mqtt_args):
workers = {}
ble_manager = BLEManager(event_handler=None, commands=None, logger=logger)
while True:
try:
found = await ble_manager.scan()
# Start workers for new devices
for address in found:
if address not in workers:
workers[address] = start_worker(address, mqtt_args)
time.sleep(1)
# Restart crashed workers
for address, proc in list(workers.items()):
if proc.poll() is not None:
logger.warning(
f"Worker for {address} exited with code {proc.returncode}, restarting..."
)
workers[address] = start_worker(address, mqtt_args)
await asyncio.sleep(10)
except KeyboardInterrupt:
logger.info("Supervisor interrupted, shutting down workers...")
for proc in workers.values():
proc.terminate()
for proc in workers.values():
proc.wait()
break
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Petkit Supervisor")
parser.add_argument("--mqtt", action="store_true")
parser.add_argument("--mqtt_broker", type=str)
parser.add_argument("--mqtt_port", type=int, default=1883)
parser.add_argument("--mqtt_user", type=str)
parser.add_argument("--mqtt_password", type=str)
args = parser.parse_args()
mqtt_args = None
if args.mqtt:
mqtt_args = {
"broker": args.mqtt_broker,
"port": args.mqtt_port,
"username": args.mqtt_user,
"password": args.mqtt_password,
}
asyncio.run(supervisor(mqtt_args))