Skip to content

Commit 98d82dd

Browse files
committed
Pack per frame metadata into one message
To send fewer and larger messages, pack both which frames are on device and how large each frame is into one message.
1 parent 7b3cecd commit 98d82dd

File tree

1 file changed

+8
-11
lines changed

1 file changed

+8
-11
lines changed

distributed/comm/ucx.py

+8-11
Original file line numberDiff line numberDiff line change
@@ -191,8 +191,9 @@ async def write(
191191

192192
# Send meta data
193193
await self.ep.send(struct.pack("Q", nframes))
194-
await self.ep.send(struct.pack(nframes * "?", *cuda_frames))
195-
await self.ep.send(struct.pack(nframes * "Q", *sizes))
194+
await self.ep.send(
195+
struct.pack(nframes * "?" + nframes * "Q", *cuda_frames, *sizes)
196+
)
196197

197198
# Send frames
198199

@@ -226,15 +227,11 @@ async def read(self, deserializers=("cuda", "dask", "pickle", "error")):
226227
await self.ep.recv(nframes)
227228
(nframes,) = struct.unpack(nframes_fmt, nframes)
228229

229-
cuda_frames_fmt = nframes * "?"
230-
cuda_frames = host_array(struct.calcsize(cuda_frames_fmt))
231-
await self.ep.recv(cuda_frames)
232-
cuda_frames = struct.unpack(cuda_frames_fmt, cuda_frames)
233-
234-
sizes_fmt = nframes * "Q"
235-
sizes = host_array(struct.calcsize(sizes_fmt))
236-
await self.ep.recv(sizes)
237-
sizes = struct.unpack(sizes_fmt, sizes)
230+
header_fmt = nframes * "?" + nframes * "Q"
231+
header = host_array(struct.calcsize(header_fmt))
232+
await self.ep.recv(header)
233+
header = struct.unpack(header_fmt, header)
234+
cuda_frames, sizes = header[:nframes], header[nframes:]
238235
except (ucp.exceptions.UCXBaseException, CancelledError):
239236
self.abort()
240237
raise CommClosedError("While reading, the connection was closed")

0 commit comments

Comments
 (0)