1- import json
21from logging import Logger
32from multiprocessing .connection import Connection
43from typing import Any , Callable , TypeVar
54
5+ import msgpack
6+
67DeserializedType = TypeVar ("DeserializedType" )
78
89
@@ -21,7 +22,7 @@ def send_serializable(
2122 obj : Any ,
2223 logger : Logger ,
2324 chunk_size : int = 1_000_000 ,
24- serializer : Callable [[Any ], str ] = json . dumps ,
25+ serializer : Callable [[Any ], bytes ] = msgpack . packb ,
2526) -> None :
2627 """Send any serializable object in chunks to avoid pipe buffer issues.
2728
@@ -30,15 +31,14 @@ def send_serializable(
3031 obj: Any serializable object to send
3132 logger: Logger for debugging
3233 chunk_size: The size of each chunk in bytes
33- serializer: Function to serialize object to string (default: json.dumps )
34+ serializer: Function to serialize object to bytes (default: msgpack.packb )
3435 """
35- serialized = serializer (obj )
36- encoded = serialized .encode ("utf-8" )
36+ packed_data = serializer (obj )
3737
38- logger .debug (f"Sending object of size { len (encoded )} bytes" )
38+ logger .debug (f"Sending object of size { len (packed_data )} bytes" )
3939
40- for i in range (0 , len (serialized ), chunk_size ):
41- chunk = serialized [i : i + chunk_size ]
40+ for i in range (0 , len (packed_data ), chunk_size ):
41+ chunk = packed_data [i : i + chunk_size ]
4242 connection .send (chunk )
4343 logger .debug (f"Sent chunk of size { len (chunk )} bytes" )
4444
@@ -48,14 +48,14 @@ def send_serializable(
4848
4949
5050def receive_serializable (
51- connection : Connection , logger : Logger , deserializer : Callable [[str ], DeserializedType ] = json . loads
51+ connection : Connection , logger : Logger , deserializer : Callable [[bytes ], DeserializedType ] = msgpack . unpackb
5252) -> DeserializedType :
5353 """Receive any serializable object in chunks to avoid pipe buffer issues.
5454
5555 Args:
5656 connection: The connection to receive chunks from
5757 logger: Logger for debugging
58- deserializer: Function to deserialize string back to object (default: json.loads )
58+ deserializer: Function to deserialize bytes back to object (default: msgpack.unpackb )
5959
6060 Returns:
6161 The complete deserialized object
@@ -71,8 +71,8 @@ def receive_serializable(
7171 logger .debug (f"Received chunk of size { len (chunk )} bytes" )
7272
7373 logger .debug ("Finished receiving object" )
74- serialized_str = "" .join (chunks )
75- obj = deserializer (serialized_str )
76- logger .debug (f"Received object of size { len (serialized_str )} bytes" )
74+ packed_data = b "" .join (chunks )
75+ obj = deserializer (packed_data )
76+ logger .debug (f"Received object of size { len (packed_data )} bytes" )
7777
7878 return obj
0 commit comments