Skip to content

Commit aba8d6d

Browse files
BewareMyPowershibd
authored andcommitted
Fix buffer overflow for non-batched send when the message metadata size exceeds 64KB (#443)
See apache/pulsar-client-python#223 ### Motivation Currently a shared buffer is used to store serialized message metadata for each send request. However, its capacity is only 64KB, when the metadata size exceeds 64KB, buffer overflow could happen. ### Modifications When the metadata size is too large, allocate a new buffer instead of using the shared buffer. Add `testLargeProperties` to cover it. (cherry picked from commit 8f269e8)
1 parent 8df0acd commit aba8d6d

File tree

2 files changed

+42
-4
lines changed

2 files changed

+42
-4
lines changed

lib/Commands.cc

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ SharedBuffer Commands::newConsumerStats(uint64_t consumerId, uint64_t requestId)
191191
return buffer;
192192
}
193193

194-
PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, ChecksumType checksumType,
194+
PairSharedBuffer Commands::newSend(SharedBuffer& originalHeaders, BaseCommand& cmd, ChecksumType checksumType,
195195
const SendArguments& args) {
196196
cmd.set_type(BaseCommand::SEND);
197197
CommandSend* send = cmd.mutable_send();
@@ -221,9 +221,16 @@ PairSharedBuffer Commands::newSend(SharedBuffer& headers, BaseCommand& cmd, Chec
221221
int totalSize = headerContentSize + payloadSize;
222222
int checksumReaderIndex = -1;
223223

224-
headers.reset();
225-
assert(headers.writableBytes() >= (4 + headerContentSize)); // totalSize + headerLength
226-
headers.writeUnsignedInt(totalSize); // External frame
224+
// By default, headers refers a static buffer whose capacity is 64KB, which can be reused for headers to
225+
// avoid frequent memory allocation. However, if users configure many properties, the size could be great
226+
// that results a buffer overflow. In this case, we can only allocate a new larger buffer.
227+
originalHeaders.reset();
228+
auto headers = originalHeaders;
229+
if (headers.writableBytes() < (4 /* header length */ + headerContentSize)) {
230+
headers = SharedBuffer::allocate(4 + headerContentSize);
231+
}
232+
233+
headers.writeUnsignedInt(totalSize); // External frame
227234

228235
// Write cmd
229236
headers.writeUnsignedInt(cmdSize);

tests/ProducerTest.cc

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,4 +683,35 @@ TEST(ProducerTest, testFailedToCreateNewPartitionProducer) {
683683
client.close();
684684
}
685685

686+
TEST(ProducerTest, testLargeProperties) {
687+
const std::string topic = "producer-test-large-properties-" + std::to_string(time(nullptr));
688+
Client client(serviceUrl);
689+
Producer producer;
690+
ProducerConfiguration conf;
691+
conf.setBatchingEnabled(false);
692+
ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
693+
Consumer consumer;
694+
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
695+
696+
MessageBuilder::StringMap properties;
697+
constexpr int propertyCount = 20000;
698+
auto builder = MessageBuilder().setContent("msg");
699+
for (int i = 0; i < propertyCount; i++) {
700+
builder.setProperty("key" + std::to_string(i), "value-" + std::to_string(i));
701+
}
702+
703+
// ASSERT_EQ(ResultOk,
704+
// producer.send(MessageBuilder().setContent("msg").setProperties(properties).build()));
705+
ASSERT_EQ(ResultOk, producer.send(builder.build()));
706+
707+
Message msg;
708+
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
709+
ASSERT_EQ(msg.getProperties().size(), propertyCount);
710+
for (int i = 0; i < propertyCount; i++) {
711+
auto it = msg.getProperties().find("key" + std::to_string(i));
712+
ASSERT_NE(it, msg.getProperties().cend());
713+
}
714+
client.close();
715+
}
716+
686717
INSTANTIATE_TEST_CASE_P(Pulsar, ProducerTest, ::testing::Values(true, false));

0 commit comments

Comments
 (0)