Skip to content

Chunk with multiple messages #251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

nirandaperera
Copy link
Contributor

@nirandaperera nirandaperera commented May 6, 2025

Chunk with multiple messages. This PR only moves the existing Chunk class to the new impl and it would only have 1 message in it.

This class has two buffers:

  • metadata_: The metadata buffer that contains information about the messages in the chunks and the concatenated metadata of the messages.
  • data_: The data buffer that contains the concatenateddata of the messages in the chunk.

All the chunk information will be encoded to the metadata_ buffer as follows.
The metadata_ buffer uses the following format:

  • chunk_id: uint64_t, ID of the chunk
  • n_elements: size_t, Number of messages in the chunk
  • [partition_ids]: vector, Partition IDs of the messages, size = n_elements
  • [expected_num_chunks]: vector<size_t>, Expected number of chunks of the messages, size = n_elements
  • [meta_offsets]: vector<uint32_t>, Offsets (excluding 0) of the metadata sizes of the messages, size = n_elements
  • [data_offsets]: vector<uint64_t>, Offsets (excluding 0) of the data sizes of the messages, size = n_elements
  • [concat_metadata]: vector<uint8_t>, Concatenated metadata of the messages, size = meta_offsets[n_elements - 1]

For a chunk with N messages with M bytes of concat metadata the size of metadata_ buffer is sizeof(ChunkID) + sizeof(size_t) + N (sizeof(PartID) + sizeof(size_t) + sizeof(uint32_t) + sizeof(uint64_t)) + M = 16 + N 24 + M bytes.

For a chunk with a single control message, the size of the metadata_ buffer is sizeof(ChunkID) + sizeof(PartID)+ 2*sizeof(size_t) + sizeof(uint32_t) + sizeof(uint64_t) = 40 bytes.

For a chunk with a single message with M bytes of metadata, the size of the metadata_ buffer is sizeof(ChunkID) + sizeof(PartID) + sizeof(size_t) + sizeof(uint32_t) + sizeof(ChunkID) + sizeof(PartID) + sizeof(size_t) + sizeof(uint32_t) + sizeof(uint64_t) + M = 40 + M bytes.

Signed-off-by: niranda perera <[email protected]>
@pentschev
Copy link
Member

If you're merging branch-25.08 into your PR you need to retarget it to branch-25.08 as well. Was that intended @nirandaperera ?

@nirandaperera nirandaperera force-pushed the multi-packed-data-chunk branch from fd59be4 to aba10c0 Compare May 6, 2025 21:01
@nirandaperera
Copy link
Contributor Author

If you're merging branch-25.08 into your PR you need to retarget it to branch-25.08 as well. Was that intended @nirandaperera ?

Thanks @pentschev. It was a mistake. I force pushed the changes now

Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
@nirandaperera nirandaperera added breaking Introduces a breaking change improvement Improves an existing functionality labels May 6, 2025
@nirandaperera
Copy link
Contributor Author

@wence- @madsbk this PR has the "new" Chunk API, with scaffolding for housing multiple messages. I didnt rename it to Chunk ATM, because I felt like the API is cleaner to review like this. I will replace Chunk once this comes out of draft.

Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
/**
* @brief Chunk with multiple messages.
*
* This class will have two buffers:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* This class will have two buffers:
* This class will has two buffers:

* - data_: The data buffer that contains the concatenateddata of the messages in the
* chunk.
*
* The metadata_ buffer will have the following format:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* The metadata_ buffer will have the following format:
* The metadata_ buffer has the following format:

Comment on lines 45 to 47
* - [psum_meta]: std::vector<uint32_t>, Prefix sums (excluding 0) of the metadata
* sizes of the messages, size = n_elements
* - [psum_data]: std::vector<uint64_t>, Prefix sums (excluding 0) of the data sizes of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this something like metadata_offsets and data_offsets respectively?

* @return The number of messages in the chunk.
*/
inline size_t n_messages() const {
return *reinterpret_cast<size_t*>(metadata_->data() + sizeof(ChunkID));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these reinterpret_cast type-punning approaches break strict-aliasing rules unfortunately.

What we should do is:

  1. (C++ 20) use std::bit_cast (but it's messy because we're carrying around std::byte and bit_cast takes values, not pointer + size).
  2. Use memcpy (the compiler will optimise this)

So, this would be (for example):

inline size_t n_messages() const {
    size_t result;
    memcpy(&result, metadata_->data() + sizeof(ChunkID), sizeof(result));
    return result;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. TIL. Sure, I will change to memcpy.

Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
@nirandaperera
Copy link
Contributor Author

/ok to test

Signed-off-by: niranda perera <[email protected]>
Signed-off-by: niranda perera <[email protected]>
@nirandaperera nirandaperera requested a review from wence- May 8, 2025 22:46
Copy link
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good!
@nirandaperera, let's prioritize getting full chunk support. I think it will have a significant impact!

@nirandaperera nirandaperera marked this pull request as ready for review May 9, 2025 15:16
@nirandaperera nirandaperera requested a review from a team as a code owner May 9, 2025 15:16
Signed-off-by: niranda perera <[email protected]>
nirandaperera and others added 3 commits May 13, 2025 13:52
@nirandaperera nirandaperera requested a review from madsbk May 13, 2025 20:53
Comment on lines 160 to 161
assert(!data_offsets_.empty());
assert(!is_control_message(i));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Should we use RAPIDSMPF_EXPECTS here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was previously thinking about only populate data_offsets_ only when there are data messages. But ended up populating data_offsets_ and meta_offsets_ for all messages. So, we can remove these asserts.

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can tidy some things up and remove TODOs by ensuring Chunks obey their invariants by construction.

inline size_t concat_metadata_size() const {
assert(metadata_);
assert(!meta_offsets_.empty());
assert(meta_offsets_[n_messages() - 1] == metadata_->size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These invariants are (or should be) enforced by the constructor, so why do we need to assert them here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point. I was mostly using the asserts to prevent me from shooting myself in the foot 😉 I'll remove these

);

ChunkID const chunk_id_; ///< The ID of the chunk.
size_t const n_messages_; ///< The number of messages in the chunk.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: this information needs to be sent over the wire, but I think it redundantly encodes meta_offsets_.size(), is that right? If yes, should we remove it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. We can remove Chunk::n_messges_ and use the size of either on these vectors

       std::vector<PartID> part_ids,
        std::vector<size_t> expected_num_chunks,
        std::vector<uint32_t> meta_offsets,
        std::vector<uint64_t> data_offsets,

Comment on lines 25 to 32
: chunk_id_{chunk_id},
n_messages_{n_messages},
part_ids_{std::move(part_ids)},
expected_num_chunks_{std::move(expected_num_chunks)},
meta_offsets_{std::move(meta_offsets)},
data_offsets_{std::move(data_offsets)},
metadata_{std::move(metadata)},
data_{std::move(data)} {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestions: Let us do all the validation of the format of the Chunk here. Then we don't need assertions scattered around the rest of the code, since it will not be possible to construct an invalid chunk.

Comment on lines 131 to 132
* the message is a data message, the buffers will be moved to the new ChunkBatch.
* Otherwise a new ChunkBatch will be created by copying data.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not a ChunkBatch any more.

Comment on lines 171 to 174
// For each message, validate the metadata and data sizes
auto const* psum_meta = reinterpret_cast<uint32_t const*>(
serialized_buf.data() + sizeof(ChunkID) + sizeof(size_t)
+ n_messages * (sizeof(PartID) + sizeof(size_t))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this.

);
auto const* psum_data = reinterpret_cast<uint64_t const*>(psum_meta + n_messages);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this.

@@ -14,9 +14,16 @@ namespace rapidsmpf::shuffler::detail {

template <typename KeyType>
void PostBox<KeyType>::insert(Chunk&& chunk) {
// check if all partition IDs in the chunk map to the same key
KeyType key = key_map_fn_(chunk.part_id(0));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: I guess by construction a chunk always contains at least one piece?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

if (chunk.gpu_data) {
statistics_->add_bytes_stat("shuffle-payload-send", chunk.gpu_data->size);
statistics_->add_bytes_stat("shuffle-payload-recv", chunk.gpu_data->size);
// TODO: Guarantee that all messages in the chunk map to the same key (rank).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, do this in the constructor of Chunk then you don't need to litter the rest of the code with checks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you suggesting that we add a mapping function to the Chunk ctr?

@nirandaperera nirandaperera requested a review from wence- May 14, 2025 23:21
Copy link
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Final suggestions

@madsbk
Copy link
Member

madsbk commented May 30, 2025

@wence- do you have anything else?

Copy link
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are still perhaps a few places where the invariants of chunks (e.g. that all partitions in a chunk map to the same ID) could be enforced at construction. But I think that can be done in a followup, since it requires pushing various additional information like the partition->rank mapping function through.

@nirandaperera nirandaperera changed the base branch from branch-25.06 to branch-25.08 May 30, 2025 17:38
@nirandaperera
Copy link
Contributor Author

/merge

@rapids-bot rapids-bot bot merged commit 9ea7d2a into rapidsai:branch-25.08 May 30, 2025
41 checks passed
rapids-bot bot pushed a commit that referenced this pull request Jun 25, 2025
This PR adds multi-packed data for the shuffler. 

Closes #145 

Depends on #251 #271

## Perf Analysis 

Weak scaling analysis, of 4GB/ rank shuffle (pre-hash partitioned data, in the PDX cluster). The concatenation shows significant performance improvement, amid creating multiple copies of data. 

| out_parts | in_parts | nranks | global throughput GiB/s (new) |        | local throughput GiB/s |       | time (s) |        | time change % | local throughput change % | global throughput change% |
|:---------:|:--------:|:------:|:-----------------------------:|:------:|:----------------------:|:-----:|:--------:|:------:|:-------------:|:-------------------------:|:-------------------------:|
|           |          |        |              new              |   old  |           new          |  old  |    new   |   old  |               |                           |                           |
|    128    |     8    |    2   |             174.20            | 123.78 |          87.10         | 61.89 |  0.0500  | 0.0650 |     23.07     |           40.74           |           40.73           |
|    128    |     8    |    4   |             318.13            | 136.83 |          79.53         | 34.21 |  0.0500  | 0.1175 |     57.44     |           132.49          |           132.49          |
|    128    |     8    |    8   |             440.81            | 271.68 |          55.10         | 33.96 |  0.0737  | 0.1187 |     37.89     |           62.24           |           62.25           |

[notebook link](https://colab.research.google.com/drive/1tgtn-dTw_YB9yfBQNs5RN-EJccAwBIm5?authuser=1#scrollTo=YeM0d1PVQ4UR)

![image](https://github.com/user-attachments/assets/9c4e7802-2591-4fa9-9eec-6554b0a9f051)

![image](https://github.com/user-attachments/assets/9407af38-4ecd-4dd4-a6d4-18fab8b895b2)

Authors:
  - Niranda Perera (https://github.com/nirandaperera)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #291
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
breaking Introduces a breaking change improvement Improves an existing functionality
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants