Skip to content

Commit 3a496a4

Browse files
committed
fix: enhance error handling for empty request body and invalid lengths in message processing
1 parent bfc564a commit 3a496a4

3 files changed

Lines changed: 26 additions & 7 deletions

File tree

deps/rocketmq/src/ClientRemotingProcessor.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,13 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r
168168
}
169169

170170
auto body = request->body();
171+
if (body == nullptr) {
172+
LOG_WARN_NEW("receive reply message without body");
173+
response->set_code(MQResponseCode::SYSTEM_ERROR);
174+
response->set_remark("reply message body is empty");
175+
return response.release();
176+
}
177+
171178
if ((requestHeader->sys_flag() & MessageSysFlag::COMPRESSED_FLAG) == MessageSysFlag::COMPRESSED_FLAG) {
172179
std::string origin_body;
173180
if (UtilAll::inflate(*body, origin_body)) {

deps/rocketmq/src/message/MessageDecoder.cpp

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -185,15 +185,24 @@ MessageExtPtr MessageDecoder::decode(ByteBuffer& byteBuffer, bool readBody, bool
185185
}
186186

187187
// 16 TOPIC
188-
int8_t topicLen = byteBuffer.get();
188+
int8_t topicLenRaw = byteBuffer.get();
189+
int topicLen = static_cast<uint8_t>(topicLenRaw);
190+
if (topicLen > byteBuffer.remaining()) {
191+
LOG_ERROR_NEW("Invalid topic length:{}, remain: {}", topicLen, byteBuffer.remaining());
192+
return nullptr;
193+
}
189194
ByteArray topic(topicLen);
190195
byteBuffer.get(topic);
191196
msgExt->set_topic(topic.array(), topic.size());
192197

193198
// 17 properties
194-
int16_t propertiesLen = byteBuffer.getShort();
195-
if (propertiesLen > 0) {
196-
ByteArray properties(propertiesLen);
199+
int16_t propertiesLenRaw = byteBuffer.getShort();
200+
if (propertiesLenRaw < 0 || propertiesLenRaw > byteBuffer.remaining()) {
201+
LOG_ERROR_NEW("Invalid properties length:{}, remain:{}", propertiesLenRaw, byteBuffer.remaining());
202+
return nullptr;
203+
}
204+
if (propertiesLenRaw > 0) {
205+
ByteArray properties(propertiesLenRaw);
197206
byteBuffer.get(properties);
198207
std::string propertiesString(properties.array(), properties.size());
199208
std::map<std::string, std::string> propertiesMap = string2messageProperties(propertiesString);

deps/rocketmq/src/transport/TcpRemotingClient.cpp

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,10 +221,11 @@ std::unique_ptr<RemotingCommand> TcpRemotingClient::invokeSync(const std::string
221221
try {
222222
doBeforeRpcHooks(addr, request, true);
223223
auto costTime = UtilAll::currentTimeMillis() - beginStartTime;
224-
if (timeoutMillis <= 0 || timeoutMillis < costTime) {
224+
if (timeoutMillis <= 0 || timeoutMillis <= costTime) {
225225
THROW_MQEXCEPTION(RemotingTimeoutException, "invokeSync call timeout", -1);
226226
}
227-
std::unique_ptr<RemotingCommand> response(invokeSyncImpl(channel, request, timeoutMillis));
227+
auto remainTimeoutMillis = timeoutMillis - costTime;
228+
std::unique_ptr<RemotingCommand> response(invokeSyncImpl(channel, request, remainTimeoutMillis));
228229
doAfterRpcHooks(addr, request, response.get(), false);
229230
return response;
230231
} catch (const RemotingSendRequestException& e) {
@@ -636,7 +637,9 @@ void TcpRemotingClient::processRequestCommand(std::unique_ptr<RemotingCommand> r
636637

637638
doBeforeRpcHooks(channel->getPeerAddrAndPort(), *requestCommand, false);
638639
response.reset(processor->processRequest(channel, requestCommand.get()));
639-
doAfterRpcHooks(channel->getPeerAddrAndPort(), *response, response.get(), true);
640+
if (response != nullptr) {
641+
doAfterRpcHooks(channel->getPeerAddrAndPort(), *response, response.get(), true);
642+
}
640643
} catch (std::exception& e) {
641644
LOG_ERROR_NEW("process request exception. {}", e.what());
642645

0 commit comments

Comments
 (0)