Skip to content

Add ChunkBatch interface to aggregate small messages before sending #231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: branch-25.06
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
8b655ac
wip
nirandaperera Mar 31, 2025
73a76ad
Merge branch 'branch-25.06' of github.com:rapidsai/rapids-multi-gpu i…
nirandaperera Mar 31, 2025
bd2093d
wip
nirandaperera Apr 3, 2025
25f6117
wip
nirandaperera Apr 7, 2025
5e5fbbf
Merge branch 'branch-25.06' of github.com:rapidsai/rapids-multi-gpu i…
nirandaperera Apr 7, 2025
a8ee7ba
chunk batch definition
nirandaperera Apr 8, 2025
a05df4e
running precommit
nirandaperera Apr 8, 2025
960bf2e
adding chunk visitor
nirandaperera Apr 8, 2025
3080936
more comments
nirandaperera Apr 8, 2025
0f0468a
addressign comments
nirandaperera Apr 8, 2025
3c7695e
Merge branch 'branch-25.06' of github.com:rapidsai/rapids-multi-gpu i…
nirandaperera Apr 8, 2025
7f11cb2
fix build
nirandaperera Apr 8, 2025
62062ed
adding test
nirandaperera Apr 9, 2025
1dfeda1
adding more comments
nirandaperera Apr 9, 2025
a90bf30
Merge branch 'branch-25.06' of github.com:rapidsai/rapids-multi-gpu i…
nirandaperera Apr 9, 2025
e0e6449
more tests
nirandaperera Apr 9, 2025
4a871c3
using thrust equal
nirandaperera Apr 10, 2025
0507d05
Merge branch 'branch-25.06' of github.com:rapidsai/rapids-multi-gpu i…
nirandaperera Apr 10, 2025
708c67c
precommit
nirandaperera Apr 10, 2025
9efc2e8
adding fwd iterator
nirandaperera Apr 12, 2025
95f84ad
Merge branch 'branch-25.06' of github.com:rapidsai/rapids-multi-gpu i…
nirandaperera Apr 15, 2025
057917d
merge conflicts
nirandaperera Apr 15, 2025
1324063
running precommit
nirandaperera Apr 15, 2025
32f1199
special case for batches with 1 payload
nirandaperera Apr 17, 2025
9f236af
addressing comments
nirandaperera Apr 17, 2025
29deb42
Merge branch 'branch-25.06' into batch-input-partitions
nirandaperera Apr 17, 2025
bfb5578
Merge branch 'branch-25.06' of github.com:rapidsai/rapidsmpf into bat…
nirandaperera Apr 24, 2025
f868416
running precommit
nirandaperera Apr 24, 2025
b17cee4
addressing comments
nirandaperera Apr 24, 2025
bc2895a
Merge branch 'branch-25.06' of github.com:rapidsai/rapidsmpf into bat…
nirandaperera Apr 28, 2025
f9906cd
precommit
nirandaperera Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 46 additions & 2 deletions cpp/include/rapidsmpf/buffer/buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#pragma once

#include <array>
#include <cstddef>
#include <memory>
#include <variant>
#include <vector>
Expand All @@ -17,6 +18,7 @@ namespace rapidsmpf {

class BufferResource;


/// @brief Enum representing the type of memory.
enum class MemoryType : int {
DEVICE = 0, ///< Device memory
Expand Down Expand Up @@ -64,7 +66,7 @@ class Buffer {
*
* @throws std::logic_error if the buffer does not manage host memory.
*/
[[nodiscard]] constexpr std::unique_ptr<std::vector<uint8_t>> const& host() const {
[[nodiscard]] constexpr HostStorageT const& host() const {
if (const auto* ref = std::get_if<HostStorageT>(&storage_)) {
return *ref;
} else {
Expand All @@ -79,7 +81,7 @@ class Buffer {
*
* @throws std::logic_error if the buffer does not manage device memory.
*/
[[nodiscard]] constexpr std::unique_ptr<rmm::device_buffer> const& device() const {
[[nodiscard]] constexpr DeviceStorageT const& device() const {
if (const auto* ref = std::get_if<DeviceStorageT>(&storage_)) {
return *ref;
} else {
Expand Down Expand Up @@ -122,6 +124,48 @@ class Buffer {
);
}

/**
* @brief Copy a slice of the buffer to a new buffer.
*
* @param offset Offset in bytes from the start of the buffer.
* @param length Length in bytes of the slice.
* @param stream CUDA stream to use for the copy.
* @returns A new buffer containing the copied slice.
*/
[[nodiscard]] std::unique_ptr<Buffer> copy_slice(
std::ptrdiff_t offset, std::ptrdiff_t length, rmm::cuda_stream_view stream
) const;

/**
* @brief Copy a slice of the buffer to a new buffer.
*
* @param target Memory type of the new buffer.
* @param offset Offset in bytes from the start of the buffer.
* @param length Length in bytes of the slice.
* @param stream CUDA stream to use for the copy.
* @returns A new buffer containing the copied slice.
*/
[[nodiscard]] std::unique_ptr<Buffer> copy_slice(
MemoryType target,
std::ptrdiff_t offset,
std::ptrdiff_t length,
rmm::cuda_stream_view stream
) const;

/**
* @brief Copy the buffer to a destination buffer with a given offset.
*
* @param dest Destination buffer.
* @param offset Offset of the destination buffer.
* @param stream CUDA stream to use for the copy.
* @returns Number of bytes written to the destination buffer.
*
* @throws std::logic_error if copy violates the bounds of the destination buffer.
*/
[[nodiscard]] std::ptrdiff_t copy_to(
Buffer& dest, std::ptrdiff_t offset, rmm::cuda_stream_view stream
) const;
Comment on lines +155 to +167
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these slice-based interfaces the best approach, or should we accept span-like things. That is, we could have a slice method that returns a Span over a Buffer and then copy_to could accept a Span &dest, this would push validation of offsets to the Span construction. etc...


/// @brief Buffer has a move ctor but no copy or assign operator.
Buffer(Buffer&&) = default;
Buffer(Buffer const&) = delete;
Expand Down
18 changes: 17 additions & 1 deletion cpp/include/rapidsmpf/shuffler/chunk.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class Chunk {
);

/**
* @brief Construct a new chunk of a partition.
* @brief Construct a new chunk with no data.
*
* This is used to indicate the number of chunks that were sent for a particular
* partition.
*
* @param pid The ID of the partition this chunk is part of.
* @param cid The ID of the chunk.
Expand All @@ -83,6 +86,8 @@ class Chunk {
/// If not zero, the number of chunks of the partition expected to get from the
/// sending rank. Ignored when it is zero.
std::size_t expected_num_chunks;
/// If known, the size of the metadata buffer (in bytes).
std::size_t metadata_size;
/// If known, the size of the gpu data buffer (in bytes).
std::size_t gpu_data_size;
};
Expand All @@ -94,6 +99,17 @@ class Chunk {
*/
[[nodiscard]] std::unique_ptr<std::vector<uint8_t>> to_metadata_message() const;

/**
* @brief Serializes this chunk into a given metadata message buffer.
*
* @param msg The metadata message as a serialized byte vector.
* @param offset The offset in the message buffer to start writing.
* @returns The number of bytes written to the message buffer.
*/
[[nodiscard]] std::ptrdiff_t to_metadata_message(
std::vector<uint8_t>& msg, std::ptrdiff_t offset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here if we accepted a cuda::std::span (C++20 but backported to C++14) over the message we wouldn't need offset argument separately, I think.

) const;

/**
* @brief Construct a new chunk from a metadata message.
*
Expand Down
Loading