Skip to content

Commit 278c38a

Browse files
committed
Merge branch 'branch-25.06' of github.com:rapidsai/rapidsmpf into multi-packed-data-chunk
2 parents baad9c0 + 94d5257 commit 278c38a

File tree

24 files changed

+775
-249
lines changed

24 files changed

+775
-249
lines changed

cpp/include/rapidsmpf/buffer/buffer.hpp

Lines changed: 92 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,22 @@
55
#pragma once
66

77
#include <array>
8+
#include <atomic>
89
#include <memory>
10+
#include <mutex>
911
#include <variant>
1012
#include <vector>
1113

14+
#include <cuda_runtime.h>
15+
1216
#include <rmm/device_buffer.hpp>
1317

1418
#include <rapidsmpf/error.hpp>
1519

1620
namespace rapidsmpf {
1721

1822
class BufferResource;
23+
class Event;
1924

2025
/// @brief Enum representing the type of memory.
2126
enum class MemoryType : int {
@@ -26,17 +31,81 @@ enum class MemoryType : int {
2631
/// @brief Array of all the different memory types.
2732
constexpr std::array<MemoryType, 2> MEMORY_TYPES{{MemoryType::DEVICE, MemoryType::HOST}};
2833

34+
namespace {
35+
/// @brief Helper for overloaded lambdas using std::visit.
36+
template <class... Ts>
37+
struct overloaded : Ts... {
38+
using Ts::operator()...;
39+
};
40+
/// @brief Explicit deduction guide
41+
template <class... Ts>
42+
overloaded(Ts...) -> overloaded<Ts...>;
43+
44+
} // namespace
45+
2946
/**
3047
* @brief Buffer representing device or host memory.
3148
*
3249
* @note The constructors are private, use `BufferResource` to construct buffers.
3350
* @note The memory type (e.g., host or device) is constant and cannot change during
3451
* the buffer's lifetime.
52+
* @note A buffer is a stream-ordered object, when passing to a library which is
53+
* not stream-aware one must ensure that `is_ready` returns `true` otherwise
54+
* behaviour is undefined.
3555
*/
3656
class Buffer {
3757
friend class BufferResource;
3858

3959
public:
60+
/**
61+
* @brief CUDA event to provide synchronization among set of chunks.
62+
*
63+
* This event is used to serve as a synchronization point for a set of chunks
64+
* given a user-specified stream.
65+
*
66+
* @note To prevent undefined behavior due to unfinished memory operations, events
67+
* should be used in the following cases, if any of the operations below was
68+
* performed *asynchronously with respect to the host*:
69+
* 1. Before addressing a device buffer's allocation.
70+
* 2. Before accessing a device buffer's data whose data has been copied from
71+
* any location, or that has been processed by a CUDA kernel.
72+
* 3. Before accessing a host buffer's data whose data has been copied from device,
73+
* or processed by a CUDA kernel.
74+
*/
75+
class Event {
76+
public:
77+
/**
78+
* @brief Construct a CUDA event for a given stream.
79+
*
80+
* @param stream CUDA stream used for device memory operations
81+
*/
82+
Event(rmm::cuda_stream_view stream);
83+
84+
/**
85+
* @brief Destructor for Event.
86+
*
87+
* Cleans up the CUDA event if one was created.
88+
*/
89+
~Event();
90+
91+
/**
92+
* @brief Check if the CUDA event has been completed.
93+
*
94+
* @return true if the event has been completed, false otherwise.
95+
*/
96+
[[nodiscard]] bool is_ready();
97+
98+
private:
99+
cudaEvent_t event_; ///< CUDA event used to track device memory allocation
100+
std::atomic<bool> done_{false
101+
}; ///< Cache of the event status to avoid unnecessary queries.
102+
mutable std::mutex mutex_; ///< Protects access to event_
103+
std::atomic<bool> destroying_{false
104+
}; ///< Flag to indicate destruction in progress
105+
std::atomic<int> active_readers_{0
106+
}; ///< Number of threads currently executing is_ready()
107+
};
108+
40109
/// @brief Storage type for the device buffer.
41110
using DeviceStorageT = std::unique_ptr<rmm::device_buffer>;
42111

@@ -48,15 +117,6 @@ class Buffer {
48117
*/
49118
using StorageT = std::variant<DeviceStorageT, HostStorageT>;
50119

51-
/// @brief Helper for overloaded lambdas for Storage types in StorageT
52-
template <class... Ts>
53-
struct overloaded : Ts... {
54-
using Ts::operator()...;
55-
};
56-
/// @brief Explicit deduction guide
57-
template <class... Ts>
58-
overloaded(Ts...) -> overloaded<Ts...>;
59-
60120
/**
61121
* @brief Access the underlying host memory buffer (const).
62122
*
@@ -112,7 +172,7 @@ class Buffer {
112172
*
113173
* @throws std::logic_error if the buffer is not initialized.
114174
*/
115-
MemoryType constexpr mem_type() const {
175+
[[nodiscard]] MemoryType constexpr mem_type() const {
116176
return std::visit(
117177
overloaded{
118178
[](const HostStorageT&) -> MemoryType { return MemoryType::HOST; },
@@ -122,8 +182,16 @@ class Buffer {
122182
);
123183
}
124184

125-
/// @brief Buffer has a move ctor but no copy or assign operator.
126-
Buffer(Buffer&&) = default;
185+
/**
186+
* @brief Check if the device memory operation has completed.
187+
*
188+
* @return true if the device memory operation has completed or no device
189+
* memory operation was performed, false if it is still in progress.
190+
*/
191+
[[nodiscard]] bool is_ready() const;
192+
193+
/// @brief Delete move and copy constructors and assignment operators.
194+
Buffer(Buffer&&) = delete;
127195
Buffer(Buffer const&) = delete;
128196
Buffer& operator=(Buffer& o) = delete;
129197
Buffer& operator=(Buffer&& o) = delete;
@@ -143,13 +211,20 @@ class Buffer {
143211
* @brief Construct a Buffer from device memory.
144212
*
145213
* @param device_buffer A unique pointer to a device buffer.
214+
* @param stream CUDA stream used for the device buffer allocation.
146215
* @param br Buffer resource for memory allocation.
216+
* @param event The shared event to use for the buffer.
147217
*
148218
* @throws std::invalid_argument if `device_buffer` is null.
149219
* @throws std::invalid_argument if `stream` or `br->mr` isn't the same used by
150220
* `device_buffer`.
151221
*/
152-
Buffer(std::unique_ptr<rmm::device_buffer> device_buffer, BufferResource* br);
222+
Buffer(
223+
std::unique_ptr<rmm::device_buffer> device_buffer,
224+
rmm::cuda_stream_view stream,
225+
BufferResource* br,
226+
std::shared_ptr<Event> event = nullptr
227+
);
153228

154229
/**
155230
* @brief Access the underlying host memory buffer.
@@ -184,7 +259,7 @@ class Buffer {
184259
/**
185260
* @brief Create a copy of this buffer using the same memory type.
186261
*
187-
* @param stream CUDA stream used for device memory operations.
262+
* @param stream CUDA stream used for the device buffer allocation and copy.
188263
* @return A unique pointer to a new Buffer containing the copied data.
189264
*/
190265
[[nodiscard]] std::unique_ptr<Buffer> copy(rmm::cuda_stream_view stream) const;
@@ -193,7 +268,7 @@ class Buffer {
193268
* @brief Create a copy of this buffer using the specified memory type.
194269
*
195270
* @param target The target memory type.
196-
* @param stream CUDA stream used for device memory operations.
271+
* @param stream CUDA stream used for device buffer allocation and copy.
197272
* @return A unique pointer to a new Buffer containing the copied data.
198273
*/
199274
[[nodiscard]] std::unique_ptr<Buffer> copy(
@@ -208,6 +283,8 @@ class Buffer {
208283
/// @brief The underlying storage host memory or device memory buffer (where
209284
/// applicable).
210285
StorageT storage_;
286+
/// @brief CUDA event used to track copy operations
287+
std::shared_ptr<Event> event_;
211288
};
212289

213290
} // namespace rapidsmpf

cpp/include/rapidsmpf/buffer/resource.hpp

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,15 @@ class BufferResource {
256256
* @brief Move device buffer data into a Buffer.
257257
*
258258
* @param data A unique pointer to the device buffer.
259+
* @param stream CUDA stream used for the data allocation, copy, and/or move.
260+
* @param event The event to use for the buffer.
259261
* @return A unique pointer to the resulting Buffer.
260262
*/
261-
std::unique_ptr<Buffer> move(std::unique_ptr<rmm::device_buffer> data);
263+
std::unique_ptr<Buffer> move(
264+
std::unique_ptr<rmm::device_buffer> data,
265+
rmm::cuda_stream_view stream,
266+
std::shared_ptr<Buffer::Event> event = nullptr
267+
);
262268

263269
/**
264270
* @brief Move a Buffer to the specified memory type.
@@ -267,7 +273,7 @@ class BufferResource {
267273
*
268274
* @param target The target memory type.
269275
* @param buffer The buffer to move.
270-
* @param stream CUDA stream for the operation.
276+
* @param stream CUDA stream used for the buffer allocation, copy, and/or move.
271277
* @param reservation The reservation to use for memory allocations.
272278
* @return A unique pointer to the moved Buffer.
273279
*
@@ -287,7 +293,7 @@ class BufferResource {
287293
* If and only if moving between different memory types will this perform a copy.
288294
*
289295
* @param buffer The buffer to move.
290-
* @param stream CUDA stream for the operation.
296+
* @param stream CUDA stream used for the buffer allocation, copy, and/or move.
291297
* @param reservation The reservation to use for memory allocations.
292298
* @return A unique pointer to the resulting device buffer.
293299
*
@@ -307,7 +313,7 @@ class BufferResource {
307313
* If and only if moving between different memory types will this perform a copy.
308314
*
309315
* @param buffer The buffer to move.
310-
* @param stream CUDA stream for the operation.
316+
* @param stream CUDA stream used for the buffer allocation, copy, and/or move.
311317
* @param reservation The reservation to use for memory allocations.
312318
* @return A unique pointer to the resulting host vector.
313319
*
@@ -328,7 +334,7 @@ class BufferResource {
328334
*
329335
* @param target The target memory type.
330336
* @param buffer The buffer to copy.
331-
* @param stream CUDA stream for the operation.
337+
* @param stream CUDA stream used for the buffer allocation and copy.
332338
* @param reservation The reservation to use for memory allocations.
333339
* @return A unique pointer to the new Buffer.
334340
*

cpp/include/rapidsmpf/communicator/communicator.hpp

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,12 @@ class Communicator {
426426
* @param rank The destination rank.
427427
* @param tag Message tag for identification.
428428
* @return A unique pointer to a `Future` representing the asynchronous operation.
429+
*
430+
* @warning The caller is responsible to ensure the underlying `Buffer` allocation
431+
* and data are already valid before calling, for example, when a CUDA allocation
432+
* and/or copy are done asynchronously. Specifically, the caller should ensure
433+
* `Buffer::is_ready()` returns true before calling this function, if not, a
434+
* warning is printed and the application will terminate.
429435
*/
430436
[[nodiscard]] virtual std::unique_ptr<Future> send(
431437
std::unique_ptr<Buffer> msg, Rank rank, Tag tag
@@ -438,6 +444,12 @@ class Communicator {
438444
* @param tag Message tag for identification.
439445
* @param recv_buffer The receive buffer.
440446
* @return A unique pointer to a `Future` representing the asynchronous operation.
447+
*
448+
* @warning The caller is responsible to ensure the underlying `Buffer` allocation
449+
* is already valid before calling, for example, when a CUDA allocation
450+
* and/or copy are done asynchronously. Specifically, the caller should ensure
451+
* `Buffer::is_ready()` returns true before calling this function, if not, a
452+
* warning is printed and the application will terminate.
441453
*/
442454
[[nodiscard]] virtual std::unique_ptr<Future> recv(
443455
Rank rank, Tag tag, std::unique_ptr<Buffer> recv_buffer

cpp/include/rapidsmpf/shuffler/chunk.hpp

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
*/
55
#pragma once
66

7+
#include <atomic>
78
#include <memory>
9+
#include <mutex>
810
#include <sstream>
911
#include <vector>
1012

@@ -14,6 +16,7 @@
1416
#include <cudf/table/table.hpp>
1517

1618
#include <rapidsmpf/buffer/buffer.hpp>
19+
#include <rapidsmpf/communicator/communicator.hpp>
1720
#include <rapidsmpf/shuffler/partition.hpp>
1821

1922
namespace rapidsmpf::shuffler::detail {
@@ -272,6 +275,7 @@ class ChunkBatch {
272275
class Chunk {
273276
public:
274277
PartID const pid; ///< Partition ID that this chunk belongs to.
278+
275279
ChunkID const cid; ///< Unique ID of this chunk.
276280

277281
/// If not zero, the number of chunks of the partition expected to get from the
@@ -292,8 +296,6 @@ class Chunk {
292296
*
293297
* @param pid The ID of the partition this chunk is part of.
294298
* @param cid The ID of the chunk.
295-
* @param expected_num_chunks If not zero, the number of chunks of the partition
296-
* expected to get from the sending rank. Ignored when it is zero.
297299
* @param gpu_data_size If known, the size of the gpu data buffer (in bytes).
298300
* @param metadata The metadata of the packed `cudf::table` that makes up this
299301
* chunk.
@@ -303,7 +305,6 @@ class Chunk {
303305
Chunk(
304306
PartID pid,
305307
ChunkID cid,
306-
std::size_t expected_num_chunks,
307308
std::size_t gpu_data_size,
308309
std::unique_ptr<std::vector<uint8_t>> metadata,
309310
std::unique_ptr<Buffer> gpu_data
@@ -371,6 +372,43 @@ class Chunk {
371372
std::size_t max_nbytes = 512,
372373
rmm::cuda_stream_view stream = cudf::get_default_stream()
373374
) const;
375+
376+
/**
377+
* @brief Returns true if the chunk is ready for consumption.
378+
*
379+
* Checks that the gpu_data's CUDA event is ready, if gpu_data contains a valid
380+
* buffer. The CUDA event is used to synchronize the chunk's data to ensure
381+
* any allocation or copy (e.g., spilling) is complete before the chunk is
382+
* consumed. If expected_num_chunks is greater than 0, or gpu_data_size is 0,
383+
* the chunk is considered always ready as it should not have any CUDA data
384+
* to receive.
385+
*
386+
* @return true if the chunk is ready, false otherwise.
387+
*/
388+
[[nodiscard]] bool is_ready() const;
389+
390+
private:
391+
/**
392+
* @brief Construct a new chunk of a partition.
393+
*
394+
* @param pid The ID of the partition this chunk is part of.
395+
* @param cid The ID of the chunk.
396+
* @param expected_num_chunks If not zero, the number of chunks of the partition
397+
* expected to get from the sending rank. Ignored when it is zero.
398+
* @param gpu_data_size If known, the size of the gpu data buffer (in bytes).
399+
* @param metadata The metadata of the packed `cudf::table` that makes up this
400+
* chunk.
401+
* @param gpu_data The gpu_data of the packed `cudf::table` that makes up this
402+
* chunk.
403+
*/
404+
Chunk(
405+
PartID pid,
406+
ChunkID cid,
407+
std::size_t expected_num_chunks,
408+
std::size_t gpu_data_size,
409+
std::unique_ptr<std::vector<uint8_t>> metadata,
410+
std::unique_ptr<Buffer> gpu_data
411+
);
374412
};
375413

376414
/**

cpp/include/rapidsmpf/shuffler/postbox.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,11 @@ class PostBox {
8383
std::unordered_map<ChunkID, Chunk> extract_by_key(KeyType key);
8484

8585
/**
86-
* @brief Extracts all chunks from the PostBox.
86+
* @brief Extracts all ready chunks from the PostBox.
8787
*
88-
* @return A vector of all chunks in the PostBox.
88+
* @return A vector of all ready chunks in the PostBox.
8989
*/
90-
std::vector<Chunk> extract_all();
90+
std::vector<Chunk> extract_all_ready();
9191

9292
/**
9393
* @brief Checks if the PostBox is empty.

0 commit comments

Comments
 (0)