-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathprotocol.py
More file actions
39 lines (32 loc) · 1.16 KB
/
protocol.py
File metadata and controls
39 lines (32 loc) · 1.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
from typing import Any
import msgpack
IDENTIFIER = bytes("MIOTYB01", "utf-8")
def decode_messages(data: bytes) -> list[dict[str, Any]]:
messages = []
while len(data) > 12:
try:
unpacker = msgpack.Unpacker(raw=False, strict_map_key=False)
length = int.from_bytes(data[8 : 8 + 4], byteorder="little")
if length + 8 > len(data):
break
unpacker.feed(data[12 : 12 + length])
data = data[12 + length :]
for msg in unpacker:
messages.append(msg)
except Exception as e:
print(f"[ERROR] Fehler beim Dekodieren der Nachricht: {e}")
return messages
def decode_message(data: bytes) -> dict[str, Any]:
"""Decode a single message from bytes"""
try:
unpacker = msgpack.Unpacker(raw=False, strict_map_key=False)
unpacker.feed(data)
for msg in unpacker:
return msg
return {}
except Exception as e:
print(f"[ERROR] Error decoding message: {e}")
return {}
def encode_message(data: dict[str, Any]) -> bytes:
encoded_message = bytes(msgpack.packb(data))
return encoded_message