Skip to content

Commit e8df008

Browse files
committed
fix: improve code quality and add unit tests for various components
1 parent 1c5f01e commit e8df008

114 files changed

Lines changed: 8001 additions & 571 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ Debug/
204204
Release/
205205
toolchain/
206206
deps/rocketmq/bin
207+
deps/rocketmq/test/bin
207208
deps/rocketmq/tmp_*
208209
deps/rocketmq/libs/signature/lib
209210
-

deps/rocketmq/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16-
cmake_minimum_required(VERSION 3.15)
16+
cmake_minimum_required(VERSION 3.5)
1717

1818
# CMake complains if we don't have this.
1919
if(COMMAND cmake_policy)

deps/rocketmq/build.sh

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -455,6 +455,7 @@ build_rocketmq_client() {
455455
-DCMAKE_LINKER="${LINK}"
456456
-DCMAKE_SYSTEM_PROCESSOR="${CMAKE_SYSTEM_PROCESSOR}"
457457
-DCMAKE_SYSTEM_NAME="${CMAKE_SYSTEM_NAME}"
458+
-DCMAKE_POLICY_VERSION_MINIMUM=3.5
458459
-DLibevent_USE_STATIC_LIBS=ON
459460
-DJSONCPP_USE_STATIC_LIBS=ON
460461
-DZLIB_USE_STATIC_LIBS=ON
@@ -479,6 +480,11 @@ build_rocketmq_client() {
479480
log_info "Compiling RocketMQ C++ client (static + bundled library)"
480481
cmake --build "${BUILD_DIR}" --target rocketmq_static rocketmq_
481482

483+
if [ $TEST -eq 1 ]; then
484+
log_info "Building unit test binaries"
485+
cmake --build "${BUILD_DIR}"
486+
fi
487+
482488
# Uncomment if installation is needed
483489
# log_info "Installing RocketMQ C++ client"
484490
# cmake --install "${BUILD_DIR}"
@@ -524,6 +530,7 @@ build_googletest() {
524530
-DCMAKE_RANLIB="${RANLIB}" \
525531
-DCMAKE_LINKER="${LINK}" \
526532
-DCMAKE_CXX_FLAGS=-fPIC \
533+
-DCMAKE_POLICY_VERSION_MINIMUM=3.5 \
527534
-DBUILD_STATIC_LIBS=ON \
528535
-DBUILD_SHARED_LIBS=OFF \
529536
-DCMAKE_INSTALL_PREFIX="${INSTALL_LIB_DIR}"

deps/rocketmq/include/DefaultLitePullConsumer.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
#ifndef ROCKETMQ_DEFAULTLITEPULLCONSUMER_H_
1818
#define ROCKETMQ_DEFAULTLITEPULLCONSUMER_H_
1919

20+
#include <memory>
21+
2022
#include "DefaultLitePullConsumerConfigProxy.h"
2123
#include "LitePullConsumer.h"
24+
#include "PullResult.h"
2225
#include "RPCHook.h"
2326

2427
namespace rocketmq {
@@ -67,6 +70,13 @@ class ROCKETMQCLIENT_API DefaultLitePullConsumer : public DefaultLitePullConsume
6770
const std::string& topic,
6871
TopicMessageQueueChangeListener* topicMessageQueueChangeListener) override;
6972

73+
std::unique_ptr<PullResult> pullOnce(const MQMessageQueue& mq,
74+
const std::string& subExpression,
75+
int64_t offset,
76+
int maxNums,
77+
bool block,
78+
long timeoutMillis) override;
79+
7080
public:
7181
void setRPCHook(RPCHookPtr rpcHook);
7282

deps/rocketmq/include/LitePullConsumer.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,17 @@
1717
#ifndef ROCKETMQ_LITEPULLCONSUMER_H_
1818
#define ROCKETMQ_LITEPULLCONSUMER_H_
1919

20+
#include <memory>
21+
2022
#include "TopicMessageQueueChangeListener.h"
2123
#include "MessageSelector.h"
2224
#include "MQMessageExt.h"
2325
#include "MQMessageQueue.h"
2426

2527
namespace rocketmq {
2628

29+
class PullResult;
30+
2731
/**
2832
* LitePullConsumer - interface for pull consumer
2933
*/
@@ -72,6 +76,13 @@ class ROCKETMQCLIENT_API LitePullConsumer {
7276
virtual void registerTopicMessageQueueChangeListener(
7377
const std::string& topic,
7478
TopicMessageQueueChangeListener* topicMessageQueueChangeListener) = 0;
79+
80+
virtual std::unique_ptr<PullResult> pullOnce(const MQMessageQueue& mq,
81+
const std::string& subExpression,
82+
int64_t offset,
83+
int maxNums,
84+
bool block,
85+
long timeout_millis) = 0;
7586
};
7687

7788
} // namespace rocketmq

deps/rocketmq/src/MQClientAPIImpl.h

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -88,16 +88,19 @@ class MQClientAPIImpl {
8888

8989
MQMessageExt viewMessage(const std::string& addr, int64_t phyoffset, int timeoutMillis);
9090

91-
int64_t searchOffset(const std::string& addr,
92-
const std::string& topic,
93-
int queueId,
94-
int64_t timestamp,
95-
int timeoutMillis);
91+
virtual int64_t searchOffset(const std::string& addr,
92+
const std::string& topic,
93+
int queueId,
94+
int64_t timestamp,
95+
int timeoutMillis);
9696

97-
int64_t getMaxOffset(const std::string& addr, const std::string& topic, int queueId, int timeoutMillis);
98-
int64_t getMinOffset(const std::string& addr, const std::string& topic, int queueId, int timeoutMillis);
97+
virtual int64_t getMaxOffset(const std::string& addr, const std::string& topic, int queueId, int timeoutMillis);
98+
virtual int64_t getMinOffset(const std::string& addr, const std::string& topic, int queueId, int timeoutMillis);
9999

100-
int64_t getEarliestMsgStoretime(const std::string& addr, const std::string& topic, int queueId, int timeoutMillis);
100+
virtual int64_t getEarliestMsgStoretime(const std::string& addr,
101+
const std::string& topic,
102+
int queueId,
103+
int timeoutMillis);
101104

102105
void getConsumerIdListByGroup(const std::string& addr,
103106
const std::string& consumerGroup,
@@ -141,7 +144,7 @@ class MQClientAPIImpl {
141144
int timeoutMillis,
142145
bool oneway = false);
143146

144-
TopicRouteData* getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis);
147+
virtual TopicRouteData* getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis);
145148

146149
TopicList* getTopicListFromNameServer();
147150

deps/rocketmq/src/common/SubscriptionGroupConfig.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ namespace rocketmq {
2323

2424
class SubscriptionGroupConfig {
2525
public:
26-
SubscriptionGroupConfig(const string& groupName) {
26+
SubscriptionGroupConfig(const std::string& groupName) {
2727
this->groupName = groupName;
2828
consumeEnable = true;
2929
consumeFromMinEnable = true;

deps/rocketmq/src/common/VirtualEnvUtil.cpp

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,30 +26,41 @@ static const char* VIRTUAL_APPGROUP_PREFIX = "%%PROJECT_%s%%";
2626
namespace rocketmq {
2727

2828
std::string VirtualEnvUtil::buildWithProjectGroup(const std::string& origin, const std::string& projectGroup) {
29-
if (!UtilAll::isBlank(projectGroup)) {
30-
char prefix[1024];
31-
sprintf(prefix, VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str());
32-
33-
if (origin.find_last_of(prefix) == std::string::npos) {
34-
return origin + prefix;
35-
} else {
36-
return origin;
37-
}
38-
} else {
29+
if (UtilAll::isBlank(projectGroup)) {
3930
return origin;
4031
}
32+
33+
char prefix[1024];
34+
sprintf(prefix, VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str());
35+
std::string suffix(prefix);
36+
37+
if (origin.size() >= suffix.size() &&
38+
origin.compare(origin.size() - suffix.size(), suffix.size(), suffix) == 0) {
39+
return origin;
40+
}
41+
42+
return origin + suffix;
4143
}
4244

4345
std::string VirtualEnvUtil::clearProjectGroup(const std::string& origin, const std::string& projectGroup) {
46+
if (UtilAll::isBlank(projectGroup)) {
47+
return origin;
48+
}
49+
4450
char prefix[1024];
4551
sprintf(prefix, VIRTUAL_APPGROUP_PREFIX, projectGroup.c_str());
46-
std::string::size_type pos = origin.find_last_of(prefix);
52+
std::string suffix(prefix);
4753

48-
if (!UtilAll::isBlank(prefix) && pos != std::string::npos) {
49-
return origin.substr(0, pos);
50-
} else {
54+
if (origin.size() < suffix.size()) {
5155
return origin;
5256
}
57+
58+
auto pos = origin.rfind(suffix);
59+
if (pos != std::string::npos && pos + suffix.size() == origin.size()) {
60+
return origin.substr(0, pos);
61+
}
62+
63+
return origin;
5364
}
5465

5566
} // namespace rocketmq

deps/rocketmq/src/consumer/AssignedMessageQueue.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,8 @@ class MessageQueueState {
6565

6666
class AssignedMessageQueue {
6767
public:
68+
AssignedMessageQueue() : rebalance_impl_(nullptr) {}
69+
6870
std::vector<MQMessageQueue> messageQueues() {
6971
std::vector<MQMessageQueue> mqs;
7072
std::lock_guard<std::mutex> lock(assigned_message_queue_state_mutex_);

deps/rocketmq/src/consumer/DefaultLitePullConsumer.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,16 @@ void DefaultLitePullConsumer::registerTopicMessageQueueChangeListener(
123123
pull_consumer_impl_->registerTopicMessageQueueChangeListener(topic, topicMessageQueueChangeListener);
124124
}
125125

126+
std::unique_ptr<PullResult> DefaultLitePullConsumer::pullOnce(const MQMessageQueue& mq,
127+
const std::string& subExpression,
128+
int64_t offset,
129+
int maxNums,
130+
bool block,
131+
long timeoutMillis) {
132+
long effectiveTimeout = timeoutMillis > 0 ? timeoutMillis : consumer_pull_timeout_millis();
133+
return pull_consumer_impl_->pullOnce(mq, subExpression, offset, maxNums, block, effectiveTimeout);
134+
}
135+
126136
void DefaultLitePullConsumer::setRPCHook(RPCHookPtr rpcHook) {
127137
dynamic_cast<DefaultLitePullConsumerImpl*>(pull_consumer_impl_.get())->setRPCHook(rpcHook);
128138
}

0 commit comments

Comments
 (0)