1+ import pickle
12from logging import Logger
23from multiprocessing .connection import Connection
34from typing import Any , Callable , TypeVar
45
5- import msgpack
6-
76DeserializedType = TypeVar ("DeserializedType" )
87
98
@@ -22,7 +21,7 @@ def send_serializable(
2221 obj : Any ,
2322 logger : Logger ,
2423 chunk_size : int = 1_000_000 ,
25- serializer : Callable [[Any ], bytes ] = msgpack . packb ,
24+ serializer : Callable [[Any ], bytes ] = pickle . dumps ,
2625) -> None :
2726 """Send any serializable object in chunks to avoid pipe buffer issues.
2827
@@ -31,14 +30,14 @@ def send_serializable(
3130 obj: Any serializable object to send
3231 logger: Logger for debugging
3332 chunk_size: The size of each chunk in bytes
34- serializer: Function to serialize object to bytes (default: msgpack.packb )
33+ serializer: Function to serialize object to bytes (default: pickle.dumps )
3534 """
36- packed_data = serializer (obj )
35+ serialized = serializer (obj )
3736
38- logger .debug (f"Sending object of size { len (packed_data )} bytes" )
37+ logger .debug (f"Sending object of size { len (serialized )} bytes" )
3938
40- for i in range (0 , len (packed_data ), chunk_size ):
41- chunk = packed_data [i : i + chunk_size ]
39+ for i in range (0 , len (serialized ), chunk_size ):
40+ chunk = serialized [i : i + chunk_size ]
4241 connection .send (chunk )
4342 logger .debug (f"Sent chunk of size { len (chunk )} bytes" )
4443
@@ -48,14 +47,14 @@ def send_serializable(
4847
4948
5049def receive_serializable (
51- connection : Connection , logger : Logger , deserializer : Callable [[bytes ], DeserializedType ] = msgpack . unpackb
50+ connection : Connection , logger : Logger , deserializer : Callable [[bytes ], DeserializedType ] = pickle . loads
5251) -> DeserializedType :
5352 """Receive any serializable object in chunks to avoid pipe buffer issues.
5453
5554 Args:
5655 connection: The connection to receive chunks from
5756 logger: Logger for debugging
58- deserializer: Function to deserialize bytes back to object (default: msgpack.unpackb )
57+ deserializer: Function to deserialize bytes back to object (default: pickle.loads )
5958
6059 Returns:
6160 The complete deserialized object
@@ -71,8 +70,8 @@ def receive_serializable(
7170 logger .debug (f"Received chunk of size { len (chunk )} bytes" )
7271
7372 logger .debug ("Finished receiving object" )
74- packed_data = b"" .join (chunks )
75- obj = deserializer (packed_data )
76- logger .debug (f"Received object of size { len (packed_data )} bytes" )
73+ serialized_data = b"" .join (chunks )
74+ obj = deserializer (serialized_data )
75+ logger .debug (f"Received object of size { len (serialized_data )} bytes" )
7776
7877 return obj
0 commit comments