Skip to content

Commit f7c0a1a

Browse files
authored
feat(trace): add message trace shift for C style apis. (#281)
1 parent 3bb810a commit f7c0a1a

9 files changed

+43
-3
lines changed

include/CCommon.h

+1
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ typedef enum _CLogLevel_ {
8383
#endif
8484

8585
typedef enum _CMessageModel_ { BROADCASTING, CLUSTERING } CMessageModel;
86+
typedef enum _CTraceModel_ { OPEN, CLOSE } CTraceModel;
8687

8788
#ifdef __cplusplus
8889
}

include/CProducer.h

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ ROCKETMQCLIENT_API int SetProducerLogLevel(CProducer* producer, CLogLevel level)
6363
ROCKETMQCLIENT_API int SetProducerSendMsgTimeout(CProducer* producer, int timeout);
6464
ROCKETMQCLIENT_API int SetProducerCompressLevel(CProducer* producer, int level);
6565
ROCKETMQCLIENT_API int SetProducerMaxMessageSize(CProducer* producer, int size);
66+
ROCKETMQCLIENT_API int SetProducerMessageTrace(CProducer* consumer, CTraceModel openTrace);
6667

6768
ROCKETMQCLIENT_API int SendMessageSync(CProducer* producer, CMessage* msg, CSendResult* result);
6869
ROCKETMQCLIENT_API int SendBatchMessage(CProducer* producer, CBatchMessage* msg, CSendResult* result);

include/CPushConsumer.h

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ ROCKETMQCLIENT_API int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLeve
5959
ROCKETMQCLIENT_API int SetPushConsumerMessageModel(CPushConsumer* consumer, CMessageModel messageModel);
6060
ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSize(CPushConsumer* consumer, int maxCacheSize);
6161
ROCKETMQCLIENT_API int SetPushConsumerMaxCacheMessageSizeInMb(CPushConsumer* consumer, int maxCacheSizeInMb);
62+
ROCKETMQCLIENT_API int SetPushConsumerMessageTrace(CPushConsumer* consumer, CTraceModel openTrace);
6263

6364
#ifdef __cplusplus
6465
}

include/TransactionMQProducer.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ class ROCKETMQCLIENT_API TransactionMQProducer {
7979
void setLogLevel(elogLevel inputLevel);
8080
elogLevel getLogLevel();
8181
void setLogFileSizeAndNum(int fileNum, long perFileSize); // perFileSize is MB unit
82-
82+
void setMessageTrace(bool messageTrace);
83+
bool getMessageTrace() const;
8384
std::shared_ptr<TransactionListener> getTransactionListener();
8485
void setTransactionListener(TransactionListener* listener);
8586
TransactionSendResult sendMessageInTransaction(MQMessage& msg, void* arg);

src/extern/CProducer.cpp

+18
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,24 @@ int SetProducerMaxMessageSize(CProducer* producer, int size) {
797797
}
798798
return OK;
799799
}
800+
int SetProducerMessageTrace(CProducer* producer, CTraceModel openTrace) {
801+
if (producer == NULL) {
802+
return NULL_POINTER;
803+
}
804+
DefaultProducer* defaultMQProducer = (DefaultProducer*)producer;
805+
bool messageTrace = openTrace == OPEN ? true : false;
806+
try {
807+
if (CAPI_C_PRODUCER_TYPE_TRANSACTION == defaultMQProducer->producerType) {
808+
defaultMQProducer->innerTransactionProducer->setMessageTrace(messageTrace);
809+
} else {
810+
defaultMQProducer->innerProducer->setMessageTrace(messageTrace);
811+
}
812+
} catch (exception& e) {
813+
MQClientErrorContainer::setErr(string(e.what()));
814+
return PRODUCER_START_FAILED;
815+
}
816+
return OK;
817+
}
800818
#ifdef __cplusplus
801819
};
802820
#endif

src/extern/CPushConsumer.cpp

+8-1
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,14 @@ int SetPushConsumerLogLevel(CPushConsumer* consumer, CLogLevel level) {
296296
((DefaultMQPushConsumer*)consumer)->setLogLevel((elogLevel)level);
297297
return OK;
298298
}
299-
299+
int SetPushConsumerMessageTrace(CPushConsumer* consumer, CTraceModel openTrace) {
300+
if (consumer == NULL) {
301+
return NULL_POINTER;
302+
}
303+
bool messageTrace = openTrace == OPEN ? true : false;
304+
((DefaultMQPushConsumer*)consumer)->setMessageTrace(messageTrace);
305+
return OK;
306+
}
300307
#ifdef __cplusplus
301308
};
302309
#endif

src/producer/TransactionMQProducer.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,12 @@ void TransactionMQProducer::setUnitName(std::string unitName) {
160160
const std::string& TransactionMQProducer::getUnitName() const {
161161
return impl->getUnitName();
162162
}
163-
163+
void TransactionMQProducer::setMessageTrace(bool messageTrace) {
164+
impl->setMessageTrace(messageTrace);
165+
}
166+
bool TransactionMQProducer::getMessageTrace() const {
167+
return impl->getMessageTrace();
168+
}
164169
std::shared_ptr<TransactionListener> TransactionMQProducer::getTransactionListener() {
165170
return impl->getTransactionListener();
166171
}

test/src/extern/CProducerTest.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,10 @@ TEST(cProducer, info) {
228228
EXPECT_EQ(SetProducerSessionCredentials(cProducer, "accessKey", "secretKey", "channel"), OK);
229229
SessionCredentials sessionCredentials = defaultMQProducer->getSessionCredentials();
230230
EXPECT_EQ(sessionCredentials.getAccessKey(), "accessKey");
231+
232+
EXPECT_EQ(SetProducerMessageTrace(cProducer, OPEN), OK);
233+
EXPECT_EQ(defaultMQProducer->getMessageTrace(), true);
234+
231235
Mock::AllowLeak(defaultMQProducer);
232236
}
233237

test/src/extern/CPushConsumerTest.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,8 @@ TEST(cPushComsumer, info) {
118118
EXPECT_EQ(SetPushConsumerMessageModel(cpushConsumer, BROADCASTING), OK);
119119
EXPECT_EQ(mqPushConsumer->getMessageModel(), MessageModel::BROADCASTING);
120120

121+
EXPECT_EQ(SetPushConsumerMessageTrace(cpushConsumer, CLOSE), OK);
122+
EXPECT_EQ(mqPushConsumer->getMessageTrace(), false);
121123
Mock::AllowLeak(mqPushConsumer);
122124
}
123125

0 commit comments

Comments
 (0)