Skip to content

Adding ChunkBatch interface #181

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed

Conversation

nirandaperera
Copy link
Contributor

@nirandaperera nirandaperera commented Apr 8, 2025

This PR adds the ChunkBatch interface.

It serializes each chunk data into a metadata buffer and a payload buffer. Additional information such as chunk batch ID, etc are also injected at the front of the metadata buffer.

Metadata buffer format:

| BatchHeader | [[MetadataMessageHeader, Metadata], ...] |

Payload buffer format:

| [[Data, ...] |

waiting on #178

Closes #175

@nirandaperera nirandaperera requested a review from a team as a code owner April 8, 2025 07:40
@nirandaperera nirandaperera marked this pull request as draft April 8, 2025 07:40
@nirandaperera nirandaperera requested review from madsbk and wence- April 8, 2025 07:54
@nirandaperera nirandaperera added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Apr 8, 2025
@nirandaperera
Copy link
Contributor Author

nirandaperera commented Apr 8, 2025

@madsbk @wence- this has the ChunkBatch outline. I will add the tests later today.

* @throws std::logic_error if copy violates the bounds of the destination buffer.
*/
[[nodiscard]] size_t copy_to(
Buffer& dest, size_t offset, rmm::cuda_stream_view stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like we want to introduce a BufferView object that encapsulates the Buffer, offset pair.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. @madsbk and I discussed this and decided to punt it for later.

* size_t metadata_offset,
* Buffer const& payload_buf,
* size_t payload_offset)
* @param visitor visitor function
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question/thought: This looks like what you want is an iterator over the chunkbatch, so you can do something like:

for (Chunk c = batch.begin(); c != c.end(); c++) {
   ...
}

To process chunks.

I think it should be possible to write such an iterator, and maybe it is more generic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what #202 is doing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pentschev yes, I added the iterator after @wence- 's comment

/// chunk.
/// |BatchHeader|[[MetadataMessageHeader, Metadata], ...]|
///
/// TODO: change the format to have thhe MetadataMessageHeaders at the front (after
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// TODO: change the format to have thhe MetadataMessageHeaders at the front (after
/// TODO: change the format to have the MetadataMessageHeaders at the front (after

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still missing, also switch to /* ... */ to keep style.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still missing.

case MemoryType::DEVICE:
return std::make_unique<Buffer>(Buffer{
std::make_unique<rmm::device_buffer>(
static_cast<cuda::std::byte const*>(device()->data()) + offset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: can just use std::byte here. Also, either way, IWYU.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was suggested to use cuda::std::byte in this slack thread.

Comment on lines 45 to 47
// We need at least (sizeof(MetadataMessageHeader) + metadata_size) amount of space
// from the offset
msg.resize(offset + sizeof(MetadataMessageHeader) + metadata_size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: At least or exactly?

Also, this looks like a performance antipattern because if we're doing this to serialise a batch of chunks, we'll have quadratic reallocation behaviour.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it too. If the msg.size() is already higher than the resize count, it should return immediately with no-op, isnt it?
For batching, this is already preallocated. So, it should not do any reallocations within resize.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No

Complexity
Linear in the difference between the current size and count. Additional complexity possible due to reallocation if capacity is less than count.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. My bad! 🙁

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the resize now

size_t batch_metadata_size =
batch_header_size + chunk_metadata_header_size * chunks.size();
size_t batch_payload_size = 0;
MemoryType mem_type = chunks[0].gpu_data->mem_type;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: UB if there are no chunks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @wence- . Good catch

size_t payload_offset = 0;
for (auto&& chunk : chunks) {
// copy metadata
metadata_offset += chunk.to_metadata_message(*metadata_buffer, metadata_offset);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On line 44 we made the buffer the right size for all the chunks. But here we resize it every time to the "current" size. Which means we will (potentially) reallocate many times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we want to define a new metadata format for a batched chunk that would also allow random access into the serialised wire format. Right now it seems like I must iterator through all the metadata to pick out a given chunk.

What is the information we need to send and how are we currently packing, and what would be the best way to pack?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, for random access. we would need a prefix sum array for both metadata and gpu data. Then, we should be able to stride to the corresponding location. This could be a part of the BatchHeader.

@nirandaperera nirandaperera requested a review from wence- April 9, 2025 16:42
@nirandaperera nirandaperera marked this pull request as ready for review April 9, 2025 16:42
@nirandaperera nirandaperera requested a review from a team as a code owner April 10, 2025 22:52
Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't been able to review all of it yet and will complete tomorrow. Left a few comments for now.

Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some more comments. However, I'm not really sure what is ChunkBatch planned to be used for. I think you have discussed this with Mads and Lawrence previously who may know the context already. Could you write down in the description of the issue what is the planned goal for ChunkBatch? A good way to deal with is to write a proper description in issues instead of just the title like #175.

Naively, I think the purpose here is just packing more data in a single-message, but I don't know if that's correct, if it is, have you considered things like the increased memory pressure this will cause since now there will be, at least temporarily, multiple copies of data in the device/host?

"unable to reserve gpu memory for batch"
);
payload_data = br->allocate(*mem_type, batch_payload_size, stream, reservation);
RAPIDSMP_EXPECTS(reservation.size() == 0, "didn't use all of the reservation");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this supposed to happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 112 to 133
// visit chunk data and verify if the given buffers adhere to the format
size_t visited_metadata_size = batch_header_size;
size_t visited_payload_size = 0;
batch.visit_chunk_data([&](Chunk::MetadataMessageHeader const* chunk_header,
auto const& /* metadata_buf */,
auto /* metadata_offset */,
auto const& /* payload_buf */,
auto /* payload_offset */) {
visited_metadata_size +=
(chunk_metadata_header_size + chunk_header->metadata_size);
visited_payload_size += chunk_header->gpu_data_size;
});
RAPIDSMP_EXPECTS(
visited_metadata_size == batch.metadata_buffer_->size(),
"visited metadata size doesn't match the metadata buffer size"
);
if (batch.payload_data_) {
RAPIDSMP_EXPECTS(
visited_payload_size == batch.payload_data_->size,
"visited payload size doesn't match the payload buffer size"
);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed, IOW when would they not adhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to ensure that the given 2 buffers adhere to the format. For an example, we can give a valid metadata buffer, but the payload data buffer is empty (or zeros). The reason here is, both buffers are tied together. So, it needs to be validated.

@nirandaperera
Copy link
Contributor Author

@pentschev Thank you for the review. It was my bad, I should have added a detailed explanation under #175 issue. Let me add some more information.

Comment on lines +39 to +43
/**
* @brief The size of the chunk metadata header in bytes.
*/
static constexpr std::ptrdiff_t chunk_metadata_header_size =
sizeof(Chunk::MetadataMessageHeader);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* @brief The size of the chunk metadata header in bytes.
*/
static constexpr std::ptrdiff_t chunk_metadata_header_size =
sizeof(Chunk::MetadataMessageHeader);
static constexpr std::ptrdiff_t chunk_metadata_header_size =
sizeof(Chunk::MetadataMessageHeader); ///< The size of the chunk metadata header in bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think this is just an attribute and not a method, instead of @brief we probably want to treat it as other attributes with ///<.

Comment on lines +55 to +56
/** @brief The size of the batch header in bytes. */
static constexpr std::ptrdiff_t batch_header_size = sizeof(BatchHeader);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** @brief The size of the batch header in bytes. */
static constexpr std::ptrdiff_t batch_header_size = sizeof(BatchHeader);
static constexpr std::ptrdiff_t batch_header_size = sizeof(BatchHeader); ///< The size of the batch header in bytes.

/// traversal pattern.
std::unique_ptr<std::vector<uint8_t>> metadata_buffer_;

/// GPU data buffer of the packed `cudf::table` associated with this chunk.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still missing.

/// chunk.
/// |BatchHeader|[[MetadataMessageHeader, Metadata], ...]|
///
/// TODO: change the format to have thhe MetadataMessageHeaders at the front (after
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still missing.

Comment on lines +166 to +195
assert(metadata_buffer_);
assert(metadata_buffer_->size() >= batch_header_size);

std::ptrdiff_t metadata_offset = batch_header_size;
std::ptrdiff_t payload_offset = 0;

for (size_t i = 0; i < header()->num_chunks; ++i) {
assert(
std::ptrdiff_t(metadata_buffer_->size())
>= metadata_offset + chunk_metadata_header_size
);

auto const* chunk_header =
reinterpret_cast<Chunk::MetadataMessageHeader const*>(
metadata_buffer_->data() + metadata_offset
);
metadata_offset += chunk_metadata_header_size;

assert(
metadata_buffer_->size()
>= size_t(metadata_offset) + chunk_header->metadata_size
);

if (chunk_header->gpu_data_size > 0) {
assert(payload_data_);
assert(
payload_data_->size
>= size_t(payload_offset) + chunk_header->gpu_data_size
);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use RAPIDSMPF_EXPECTS instead of asserts here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was consciously added asserts here because I felt like we can ommit these in the release build.

Comment on lines +284 to +285
* @brief Postfix increment of the iterator.
* @return Copy of the iterator before increment
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @brief Postfix increment of the iterator.
* @return Copy of the iterator before increment
* @brief Postfix increment of the iterator.
*
* @return Copy of the iterator before increment

Comment on lines +290 to +291
* @brief Equality comparison of iterators.
* @param other The other iterator to compare with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @brief Equality comparison of iterators.
* @param other The other iterator to compare with
* @brief Equality comparison of iterators.
*
* @param other The other iterator to compare with

Comment on lines +306 to +324
/**
* @brief Inequality comparison of iterators.
* @param other The other iterator to compare with
* @return true if the iterators are not equal, false otherwise
*/
bool operator!=(ChunkForwardIterator const& other) const;

/**
* @brief Copy constructor.
* @param other The other iterator to copy from.
*/
ChunkForwardIterator(const ChunkForwardIterator& other) = default;

/**
* @brief Copy assignment operator.
* @param other The other iterator to copy from.
* @return Reference to the assigned iterator.
*/
ChunkForwardIterator& operator=(const ChunkForwardIterator& other) = default;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* @brief Inequality comparison of iterators.
* @param other The other iterator to compare with
* @return true if the iterators are not equal, false otherwise
*/
bool operator!=(ChunkForwardIterator const& other) const;
/**
* @brief Copy constructor.
* @param other The other iterator to copy from.
*/
ChunkForwardIterator(const ChunkForwardIterator& other) = default;
/**
* @brief Copy assignment operator.
* @param other The other iterator to copy from.
* @return Reference to the assigned iterator.
*/
ChunkForwardIterator& operator=(const ChunkForwardIterator& other) = default;
/**
* @brief Inequality comparison of iterators.
*
* @param other The other iterator to compare with
* @return true if the iterators are not equal, false otherwise
*/
bool operator!=(ChunkForwardIterator const& other) const;
/**
* @brief Copy constructor.
*
* @param other The other iterator to copy from.
*/
ChunkForwardIterator(const ChunkForwardIterator& other) = default;
/**
* @brief Copy assignment operator.
*
* @param other The other iterator to copy from.
* @return Reference to the assigned iterator.
*/
ChunkForwardIterator& operator=(const ChunkForwardIterator& other) = default;

Comment on lines +349 to +370
* @brief Unwrap the current chunk header.
* @return Chunk header ptr.
*/
inline Chunk::MetadataMessageHeader const* chunk_header() const {
return reinterpret_cast<Chunk::MetadataMessageHeader const*>(
batch_.metadata_buffer_->data() + metadata_offset_
);
}

/**
* @brief Check if current position contains a chunks.
* @return True, if metadata offset points to a valid chunk.
*/
inline bool has_chunk() const {
return batch_.size() > 0
&& (size_t(metadata_offset_) < batch_.metadata_buffer_->size());
}

/**
* @brief Make a chunk.
* @return Chunk wrapped in a shared ptr
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @brief Unwrap the current chunk header.
* @return Chunk header ptr.
*/
inline Chunk::MetadataMessageHeader const* chunk_header() const {
return reinterpret_cast<Chunk::MetadataMessageHeader const*>(
batch_.metadata_buffer_->data() + metadata_offset_
);
}
/**
* @brief Check if current position contains a chunks.
* @return True, if metadata offset points to a valid chunk.
*/
inline bool has_chunk() const {
return batch_.size() > 0
&& (size_t(metadata_offset_) < batch_.metadata_buffer_->size());
}
/**
* @brief Make a chunk.
* @return Chunk wrapped in a shared ptr
*/
* @brief Unwrap the current chunk header.
*
* @return Chunk header ptr.
*/
inline Chunk::MetadataMessageHeader const* chunk_header() const {
return reinterpret_cast<Chunk::MetadataMessageHeader const*>(
batch_.metadata_buffer_->data() + metadata_offset_
);
}
/**
* @brief Check if current position contains a chunks.
*
* @return True, if metadata offset points to a valid chunk.
*/
inline bool has_chunk() const {
return batch_.size() > 0
&& (size_t(metadata_offset_) < batch_.metadata_buffer_->size());
}
/**
* @brief Make a chunk.
*
* @return Chunk wrapped in a shared ptr
*/

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, move implementations to .cpp file?

Comment on lines +169 to +171
// std::vector<Chunk> const chunks = batch.get_chunks(stream);
// EXPECT_EQ(exp_chunks.size(), chunks.size());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// std::vector<Chunk> const chunks = batch.get_chunks(stream);
// EXPECT_EQ(exp_chunks.size(), chunks.size());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think get_chunks was removed, and thus we could remove those lines.

@nirandaperera
Copy link
Contributor Author

@wence- @pentschev @madsbk CI seems to have some trouble with this PR. I think it is due to repo name change.
Please refer to this updated PR #231

@wence-
Copy link
Contributor

wence- commented Apr 29, 2025

Closing in favour of #231.

@wence- wence- closed this Apr 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improves an existing functionality non-breaking Introduces a non-breaking change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add a chunk batch interface
4 participants