Skip to content

Commit fd59be4

Browse files
committed
adding chunk batch
Signed-off-by: niranda perera <[email protected]>
1 parent 78b8483 commit fd59be4

File tree

3 files changed

+71
-121
lines changed

3 files changed

+71
-121
lines changed

cpp/include/rapidsmpf/shuffler/chunk.hpp

+13-4
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ namespace rapidsmpf::shuffler::detail {
2727
using ChunkID = std::uint64_t;
2828

2929
/**
30+
* @brief Chunk with multiple messages.
31+
*
3032
* Format:
3133
* - chunk_id: uint64_t, ID of the chunk
3234
* - n_elements: size_t, Number of messages in the chunk
@@ -123,8 +125,11 @@ class ChunkBatch {
123125
*
124126
* @param i The index of the message.
125127
* @return A new ChunkBatch containing the data of the i-th message.
126-
* @note This will create a copy of the packed data. If i==0 and n_messages() == 1 and
127-
* the message is a data message, the data buffer will be moved to the new ChunkBatch.
128+
* @note This will create a copy of the packed data. If there is only one message and
129+
* the message is a data message, the buffers will be moved to the new ChunkBatch.
130+
* Otherwise a new ChunkBatch will be created by copying data.
131+
*
132+
* @throws std::out_of_range if the index is out of bounds.
128133
*/
129134
ChunkBatch get_data(ChunkID new_chunk_id, size_t i, rmm::cuda_stream_view stream);
130135

@@ -185,11 +190,16 @@ class ChunkBatch {
185190
* @param chunk_id The ID of the chunk.
186191
* @param part_id The ID of the partition.
187192
* @param packed_data The packed data.
193+
* @param stream The CUDA stream.
188194
* @param br The buffer resource.
189195
* @return The ChunkBatch.
190196
*/
191197
static ChunkBatch from_packed_data(
192-
ChunkID chunk_id, PartID part_id, PackedData&& packed_data, BufferResource* br
198+
ChunkID chunk_id,
199+
PartID part_id,
200+
PackedData&& packed_data,
201+
rmm::cuda_stream_view stream,
202+
BufferResource* br
193203
);
194204

195205
/**
@@ -228,7 +238,6 @@ class ChunkBatch {
228238
*/
229239
static bool validate_metadata_format(std::vector<uint8_t> const& metadata_buf);
230240

231-
232241
private:
233242
/// @brief The beginning of the partition IDs in the chunk.
234243
inline PartID* part_ids_begin() const {

cpp/src/shuffler/chunk.cpp

+13-16
Original file line numberDiff line numberDiff line change
@@ -145,21 +145,10 @@ ChunkBatch ChunkBatch::get_data(
145145
return from_finished_partition(new_chunk_id, part_id(i), expected_num_chunks(i));
146146
}
147147

148-
// Calculate the offset and size of the metadata and data
149-
uint32_t meta_offset = i == 0 ? 0 : *(psum_meta_begin() + i - 1);
150-
uint32_t meta_size = metadata_size(i);
151-
// uint64_t data_offset = i == 0 ? 0 : *(psum_data_begin() + i - 1);
152-
uint64_t data_size = this->data_size(i);
153-
154148
ChunkBatch new_chunk;
155-
156-
// Create metadata vector
157-
new_chunk.metadata_ = std::make_unique<std::vector<uint8_t>>(
158-
concat_metadata_begin() + meta_offset,
159-
concat_metadata_begin() + meta_offset + meta_size
160-
);
161-
162-
if (n_messages() == 1 && data_size > 0) { // i == 0, already veried
149+
if (n_messages() == 1) { // i == 0, already verified
150+
// If there is only one message, move the metadata and data to the new chunk.
151+
new_chunk.metadata_ = std::move(metadata_);
163152
new_chunk.data_ = std::move(data_);
164153
} else {
165154
RAPIDSMPF_EXPECTS(false, "not implemented");
@@ -170,7 +159,11 @@ ChunkBatch ChunkBatch::get_data(
170159
}
171160

172161
ChunkBatch ChunkBatch::from_packed_data(
173-
ChunkID chunk_id, PartID part_id, PackedData&& packed_data, BufferResource* br
162+
ChunkID chunk_id,
163+
PartID part_id,
164+
PackedData&& packed_data,
165+
rmm::cuda_stream_view stream,
166+
BufferResource* br
174167
) {
175168
ChunkBatch chunk;
176169
size_t metadata_buf_size =
@@ -214,7 +207,11 @@ ChunkBatch ChunkBatch::from_packed_data(
214207
// Write data size
215208
*reinterpret_cast<uint64_t*>(chunk.psum_data_begin()) =
216209
packed_data.gpu_data->size();
217-
chunk.data_ = br->move(std::move(packed_data.gpu_data));
210+
chunk.data_ = br->move(
211+
std::move(packed_data.gpu_data),
212+
stream,
213+
std::make_shared<Buffer::Event>(stream)
214+
);
218215
}
219216

220217
return chunk;

cpp/tests/test_chunk.cpp

+45-101
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@ class ChunkBatchTest : public ::testing::Test {
1717
protected:
1818
void SetUp() override {
1919
br = std::make_unique<BufferResource>(cudf::get_current_device_resource_ref());
20+
stream = cudf::get_default_stream();
2021
}
2122

2223
std::unique_ptr<BufferResource> br;
24+
rmm::cuda_stream_view stream;
2325
};
2426

2527
TEST_F(ChunkBatchTest, FromFinishedPartition) {
@@ -44,110 +46,52 @@ TEST_F(ChunkBatchTest, FromFinishedPartition) {
4446
auto chunk2 = ChunkBatch::from_metadata_message(chunk.release_metadata_buffer());
4547
test_chunk(chunk2);
4648

47-
auto chunk3 = chunk2.get_data(chunk_id, 0, cudf::get_default_stream());
49+
auto chunk3 = chunk2.get_data(chunk_id, 0, stream);
4850
test_chunk(chunk3);
51+
52+
EXPECT_THROW(chunk3.get_data(chunk_id, 1, stream), std::out_of_range);
4953
}
5054

51-
// TEST_F(ChunkBatchTest, FromPackedData) {
52-
// ChunkID chunk_id = 123;
53-
// PartID part_id = 456;
54-
55-
// // Create test metadata
56-
// auto metadata = std::make_unique<std::vector<uint8_t>>(4);
57-
// metadata->at(0) = 1;
58-
// metadata->at(1) = 2;
59-
// metadata->at(2) = 3;
60-
// metadata->at(3) = 4;
61-
62-
// // Create test GPU data
63-
// auto gpu_data = br->allocate(MemoryType::DEVICE, 4);
64-
// uint8_t host_data[4] = {5, 6, 7, 8};
65-
// CUDA_TRY(cudaMemcpy(gpu_data->data, host_data, 4, cudaMemcpyHostToDevice));
66-
67-
// PackedData packed_data{std::move(metadata), std::move(gpu_data)};
68-
// auto chunk =
69-
// ChunkBatch::from_packed_data(chunk_id, part_id, std::move(packed_data),
70-
// br.get());
71-
72-
// EXPECT_EQ(chunk.chunk_id(), chunk_id);
73-
// EXPECT_EQ(chunk.n_messages(), 1);
74-
// EXPECT_EQ(chunk.part_id(0), part_id);
75-
// EXPECT_EQ(chunk.expected_num_chunks(0), 0);
76-
// EXPECT_FALSE(chunk.is_control_message(0));
77-
// EXPECT_EQ(chunk.metadata_size(0), 4);
78-
// EXPECT_EQ(chunk.data_size(0), 4);
79-
// }
80-
81-
// TEST_F(ChunkBatchTest, ValidateMetadataFormat) {
82-
// // Test valid metadata format
83-
// auto valid_chunk = ChunkBatch::from_finished_partition(123, 456, 789);
84-
// EXPECT_TRUE(ChunkBatch::validate_metadata_format(*valid_chunk.release_metadata_buffer(
85-
// )));
86-
87-
// // Test invalid metadata format (too small)
88-
// std::vector<uint8_t> too_small(4);
89-
// EXPECT_FALSE(ChunkBatch::validate_metadata_format(too_small));
90-
91-
// // Test invalid metadata format (zero messages)
92-
// std::vector<uint8_t> zero_messages(16);
93-
// *reinterpret_cast<ChunkID*>(zero_messages.data()) = 123;
94-
// *reinterpret_cast<size_t*>(zero_messages.data() + sizeof(ChunkID)) = 0;
95-
// EXPECT_FALSE(ChunkBatch::validate_metadata_format(zero_messages));
96-
// }
97-
98-
// TEST_F(ChunkBatchTest, GetData) {
99-
// ChunkID chunk_id = 123;
100-
// PartID part_id = 456;
101-
102-
// // Create test metadata
103-
// auto metadata = std::make_unique<std::vector<uint8_t>>(4);
104-
// metadata->at(0) = 1;
105-
// metadata->at(1) = 2;
106-
// metadata->at(2) = 3;
107-
// metadata->at(3) = 4;
108-
109-
// // Create test GPU data
110-
// auto gpu_data = br->allocate(MemoryType::DEVICE, 4);
111-
// uint8_t host_data[4] = {5, 6, 7, 8};
112-
// CUDA_TRY(cudaMemcpy(gpu_data->data, host_data, 4, cudaMemcpyHostToDevice));
113-
114-
// PackedData packed_data{std::move(metadata), std::move(gpu_data)};
115-
// auto chunk =
116-
// ChunkBatch::from_packed_data(chunk_id, part_id, std::move(packed_data),
117-
// br.get());
118-
119-
// // Test getting data from a data message
120-
// auto new_chunk = chunk.get_data(789, 0, cudf::get_default_stream());
121-
// EXPECT_EQ(new_chunk.chunk_id(), 789);
122-
// EXPECT_EQ(new_chunk.n_messages(), 1);
123-
// EXPECT_EQ(new_chunk.part_id(0), part_id);
124-
// EXPECT_EQ(new_chunk.metadata_size(0), 4);
125-
// EXPECT_EQ(new_chunk.data_size(0), 4);
126-
127-
// // Test getting data from a control message
128-
// auto control_chunk = ChunkBatch::from_finished_partition(123, 456, 789);
129-
// auto new_control_chunk = control_chunk.get_data(999, 0,
130-
// cudf::get_default_stream()); EXPECT_EQ(new_control_chunk.chunk_id(), 999);
131-
// EXPECT_EQ(new_control_chunk.n_messages(), 1);
132-
// EXPECT_EQ(new_control_chunk.part_id(0), 456);
133-
// EXPECT_EQ(new_control_chunk.expected_num_chunks(0), 789);
134-
// EXPECT_TRUE(new_control_chunk.is_control_message(0));
135-
// }
136-
137-
// TEST_F(ChunkBatchTest, FromMetadataMessage) {
138-
// // Create a chunk and convert it to metadata message
139-
// auto original_chunk = ChunkBatch::from_finished_partition(123, 456, 789);
140-
// auto metadata_msg = original_chunk.release_metadata_buffer();
141-
142-
// // Create new chunk from metadata message
143-
// auto new_chunk = ChunkBatch::from_metadata_message(std::move(metadata_msg));
144-
145-
// EXPECT_EQ(new_chunk.chunk_id(), 123);
146-
// EXPECT_EQ(new_chunk.n_messages(), 1);
147-
// EXPECT_EQ(new_chunk.part_id(0), 456);
148-
// EXPECT_EQ(new_chunk.expected_num_chunks(0), 789);
149-
// EXPECT_TRUE(new_chunk.is_control_message(0));
150-
// }
55+
TEST_F(ChunkBatchTest, FromPackedData) {
56+
ChunkID chunk_id = 123;
57+
PartID part_id = 456;
58+
59+
// Create test metadata
60+
auto metadata =
61+
std::make_unique<std::vector<uint8_t>>(std::vector<uint8_t>{1, 2, 3, 4});
62+
63+
// Create test GPU data
64+
auto data = std::make_unique<rmm::device_buffer>(4, cudf::get_default_stream());
65+
std::vector<uint8_t> host_data{5, 6, 7, 8};
66+
RAPIDSMPF_CUDA_TRY(
67+
cudaMemcpy(data->data(), host_data.data(), 4, cudaMemcpyHostToDevice)
68+
);
69+
70+
PackedData packed_data{
71+
std::make_unique<std::vector<uint8_t>>(*metadata), std::move(data)
72+
};
73+
74+
auto test_chunk = [&](ChunkBatch& chunk) {
75+
EXPECT_EQ(chunk.chunk_id(), chunk_id);
76+
EXPECT_EQ(chunk.n_messages(), 1);
77+
EXPECT_EQ(chunk.part_id(0), part_id);
78+
EXPECT_EQ(chunk.expected_num_chunks(0), 0);
79+
EXPECT_FALSE(chunk.is_control_message(0));
80+
EXPECT_EQ(chunk.metadata_size(0), 4);
81+
EXPECT_EQ(chunk.data_size(0), 4);
82+
};
83+
auto chunk = ChunkBatch::from_packed_data(
84+
chunk_id, part_id, std::move(packed_data), stream, br.get()
85+
);
86+
test_chunk(chunk);
87+
88+
auto chunk2 = ChunkBatch::from_metadata_message(chunk.release_metadata_buffer());
89+
chunk2.set_data_buffer(chunk.release_data_buffer());
90+
test_chunk(chunk2);
91+
92+
auto chunk3 = chunk2.get_data(chunk_id, 0, stream);
93+
test_chunk(chunk3);
94+
}
15195

15296
} // namespace test
15397
} // namespace rapidsmpf::shuffler::detail

0 commit comments

Comments
 (0)