-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathprotocol.py
More file actions
36 lines (25 loc) · 1.12 KB
/
protocol.py
File metadata and controls
36 lines (25 loc) · 1.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
"""Shared serialization for robot observations and actions.
Uses msgpack with numpy support for compact, fast binary encoding.
Both server.py and client.py import from here.
"""
import msgpack
import msgpack_numpy as m
import numpy as np
# Patch msgpack to handle numpy dtypes natively
m.patch()
def pack_obs(obs: dict[str, np.ndarray]) -> bytes:
"""Serialize observation dict (str -> numpy array) to msgpack bytes."""
return msgpack.packb(obs, default=m.encode)
def unpack_obs(data: bytes) -> dict[str, np.ndarray]:
"""Deserialize msgpack bytes to observation dict."""
return msgpack.unpackb(data, object_hook=m.decode, raw=False)
def pack_response(action: np.ndarray, inference_ms: float) -> bytes:
"""Serialize action + server timing metadata."""
return msgpack.packb(
{"action": action, "inference_ms": inference_ms},
default=m.encode,
)
def unpack_response(data: bytes) -> tuple[np.ndarray, float]:
"""Deserialize action response. Returns (action_array, inference_ms)."""
resp = msgpack.unpackb(data, object_hook=m.decode, raw=False)
return resp["action"], resp["inference_ms"]