-
Notifications
You must be signed in to change notification settings - Fork 11
Add ChunkBatch
interface to aggregate small messages before sending
#231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-25.06
Are you sure you want to change the base?
Add ChunkBatch
interface to aggregate small messages before sending
#231
Conversation
…nto batch-input-partitions
…nto batch-input-partitions
…nto batch-input-partitions
…nto batch-input-partitions
…nto batch-input-partitions
…nto batch-input-partitions
…ch-input-partitions
…ch-input-partitions
/ok to test |
@nirandaperera, there was an error processing your request: See the following link for more information: https://docs.gha-runners.nvidia.com/cpr/e/1/ |
/ok to test bc2895a |
Signed-off-by: niranda perera <[email protected]>
/ok to test f9906cd |
ChunkBatch
interface - Updated ChunkBatch
interface to aggregate small messages before sending
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
/** | ||
* @brief Copy the buffer to a destination buffer with a given offset. | ||
* | ||
* @param dest Destination buffer. | ||
* @param offset Offset of the destination buffer. | ||
* @param stream CUDA stream to use for the copy. | ||
* @returns Number of bytes written to the destination buffer. | ||
* | ||
* @throws std::logic_error if copy violates the bounds of the destination buffer. | ||
*/ | ||
[[nodiscard]] std::ptrdiff_t copy_to( | ||
Buffer& dest, std::ptrdiff_t offset, rmm::cuda_stream_view stream | ||
) const; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these slice-based interfaces the best approach, or should we accept span
-like things. That is, we could have a slice
method that returns a Span
over a Buffer
and then copy_to
could accept a Span &dest
, this would push validation of offsets to the Span
construction. etc...
* @returns The number of bytes written to the message buffer. | ||
*/ | ||
[[nodiscard]] std::ptrdiff_t to_metadata_message( | ||
std::vector<uint8_t>& msg, std::ptrdiff_t offset |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here if we accepted a cuda::std::span
(C++20 but backported to C++14) over the message we wouldn't need offset argument separately, I think.
reinterpret_cast<Chunk::MetadataMessageHeader const*>( | ||
metadata_buffer_->data() + metadata_offset | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: I think accessing the metadata through this pointer might be undefined behaviour if metadata_offset % alignof(Chunk::MetadataMessageHeader)
is not zero. Can you assuage my doubts?
* @param visitor visitor function | ||
*/ | ||
template <typename VisitorFn> | ||
void visit_chunk_data(VisitorFn visitor) const { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I don't really like this interface and would prefer to only have the iterator-like access. WDYT?
/** | ||
* @brief Forward iterator of chunks in the chunk batch. | ||
*/ | ||
class ChunkForwardIterator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I would prefer to implement operator()[]
and .at
on the ChunkBatch
class so that we could just have a RandomAccessIterator
. I think this might require a few changes in the metadata we store in a ChunkBatch
(basically we need to keep the exclusive scan of the offsets around).
); | ||
|
||
if (mem_type() == target) { | ||
return copy_slice(offset, length, stream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This is why I don't really like these "C-like" APIs. It's very easy to switch up offset
and length
since they are the same type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider implementing the Buffer slicing (or operator()[]
/.at
support) in a standalone PR with its own testing?
|
||
/** | ||
* @brief The structure of the batch header. | ||
* @note This is allocated at the front of the the metadata buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* @note This is allocated at the front of the the metadata buffer. | |
* @note This is allocated at the front of the metadata buffer. |
// visit chunk data and verify if the given buffers adhere to the format | ||
size_t visited_metadata_size = batch_header_size; | ||
size_t visited_payload_size = 0; | ||
batch.visit_chunk_data([&](Chunk::MetadataMessageHeader const* chunk_header, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we use ChunkForwardIterator
here?
// reserve for the chunks size | ||
ret.reserve(ret.size() + chunks.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quadratic since you're not growing geometrically so you have to do num_pigeonhole_entries * num_chunks_per_pigeonhole
allocations. Either compute the total size up-front in a separate pass, or else don't both at all and just rely on the stdlib geometric allocation growth.
* b. Host data | ||
*/ | ||
|
||
// Parametarized test for MemoryType and types of chunks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Parametarized test for MemoryType and types of chunks | |
// Parametrized test for MemoryType and types of chunks |
chunks.emplace_back(5, 5, 101); | ||
chunks.emplace_back(4, 4, 0, len, copy_metadata(), copy_data()); | ||
} else { | ||
RAPIDSMPF_EXPECTS(chunks_type == "empty", "unkown chunk type " + chunks_type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RAPIDSMPF_EXPECTS(chunks_type == "empty", "unkown chunk type " + chunks_type); | |
RAPIDSMPF_EXPECTS(chunks_type == "empty", "unknown chunk type " + chunks_type); |
chunk_payload = payload_buf.copy_slice( | ||
payload_offset, std::ptrdiff_t(chunk_header->gpu_data_size), stream | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason this iterator doesn't produce ChunkView
s and then the consumer can decide whether to copy or not?
This PR adds the
ChunkBatch
interface.It serializes each chunk data into a metadata buffer and a payload buffer. Additional information such as chunk batch ID, etc are also injected at the front of the metadata buffer.
Metadata buffer format:
Payload buffer format:
Closes #175