-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathTelemetry.py
More file actions
73 lines (60 loc) · 1.89 KB
/
Telemetry.py
File metadata and controls
73 lines (60 loc) · 1.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import json
import socket
from threading import Thread
import time
from Task import Task
from enums import LogPriority
class Telemetry:
def __init__(self, GS_IP, GS_PORT, DELAY, SOCKETIO_HOST, SOCKETIO_PORT, add_task):
self.GS_IP = GS_IP
self.GS_PORT = GS_PORT
self.DELAY = DELAY
self.SOCKETIO_HOST = SOCKETIO_HOST
self.SOCKETIO_PORT = SOCKETIO_PORT
self.add_task = add_task
self.CHUNK_SIZE = 4096
# This is where you specify what gets logged
self.log_level = LogPriority.WARN
self.socket: socket.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.listener = Thread(target=self.listen, daemon=True)
try:
self.connect()
self.listener.start()
except TimeoutError:
print("Failed to connect to ground station: Timeout")
def connect(self):
self.socket.connect((self.GS_IP, self.GS_PORT))
def listen(self):
while True:
data = self.socket.recv(self.CHUNK_SIZE)
if not data:
task = json.loads(data)
self.add_task(Task(task['action'], task['payload']))
else:
# Must have disconnected
print("Disconnected! Attempting to reconnect...")
try:
self.connect()
except TimeoutError:
print("Failed to connect to ground station: Timeout")
def critical(self, type, payload):
self.log(type, payload. LogPriority.CRITICAL)
def warn(self, type, payload):
self.log(type, payload, LogPriority.WARN)
def info(self, type, payload):
self.log(type, payload, LogPriority.INFO)
def log(self, type, payload, priority):
if priority >= self.log_level:
packet = {
"type": type,
"payload": payload
}
print("Info:", json.dumps(packet))
@classmethod
def from_config(cls, config, on_message):
GS_IP = config['GS_IP']
GS_PORT = config['GS_PORT']
DELAY = config['DELAY']
SOCKETIO_HOST = config['SOCKETIO_HOST']
SOCKETIO_PORT = config['SOCKETIO_PORT']
return cls(GS_IP, GS_PORT, DELAY, SOCKETIO_HOST, SOCKETIO_PORT, on_message)