-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
65 lines (56 loc) · 1.66 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import consts
import socketio
import threading
import led
import time
import id
def listenConsole() :
print("listening to console")
while True :
message = input()
if message == "close" :
return
led.changePattern(int(message))
def listen() :
print("starting listener")
while True :
try :
event = socket.receive(timeout=consts.WS_TIMEOUT)
on_message(event)
except socketio.exceptions.TimeoutError as e :
print(e)
def on_message(event) :
print("message")
if event[0] == "setBucketState":
pattern = event[1]
print("setting pattern", pattern)
led.changePattern(pattern)
elif event[0] == "changeBrightness":
led.changeBrightness(event[1])
elif event[0] == "changeID":
id.changeID(event[1], event[2]) # might add a type check later on
else :
print("unknown event", event)
def onError(e: Exception) :
if socket.connected :
socket.emit("robotTargetError", str(e))
else :
led.displayLog(consts.ERROR_COLOR)
# init
socket = socketio.SimpleClient()
# setup
if consts.USE_WEBSOCKET :
try :
socket.connect(consts.WS_URL, auth={"role": "robot","key": consts.WS_KEY,"robot_id":id.getID()})
print("connected websocket")
threading.Thread(target=listen).start()
except socketio.exceptions.ConnectionError as e :
print(e)
led.displayLog(consts.ERROR_COLOR)
if consts.USE_CONSOLE_INPUT :
threading.Thread(target=listenConsole).start()
# loop
while True :
led.ledControl.updatePattern()
led.ledControl.show()
time.sleep(consts.LED_INTERVAL_TIMEOUT)