Skip to content

Commit 2e70aaf

Browse files
committed
add producer stats manager
1 parent a3afb05 commit 2e70aaf

9 files changed

+113
-11
lines changed

Diff for: client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java

+31-1
Original file line numberDiff line numberDiff line change
@@ -629,6 +629,10 @@ public SendResult sendMessage(
629629
switch (communicationMode) {
630630
case ONEWAY:
631631
this.remotingClient.invokeOneway(addr, request, timeoutMillis);
632+
if (instance != null) {
633+
instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1);
634+
instance.getProducerStatsManager().incSendRT(msg.getTopic(), System.currentTimeMillis() - beginStartTime);
635+
}
632636
return null;
633637
case ASYNC:
634638
final AtomicInteger times = new AtomicInteger();
@@ -644,7 +648,19 @@ public SendResult sendMessage(
644648
if (timeoutMillis < costTimeSync) {
645649
throw new RemotingTooMuchRequestException("sendMessage call timeout");
646650
}
647-
return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
651+
try {
652+
SendResult result = this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request);
653+
if (instance != null) {
654+
instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1);
655+
instance.getProducerStatsManager().incSendRT(msg.getTopic(), System.currentTimeMillis() - beginStartTime);
656+
}
657+
return result;
658+
} catch (Exception e) {
659+
if (instance != null) {
660+
instance.getProducerStatsManager().incSendFailedTimes(msg.getTopic(), 1);
661+
}
662+
throw e;
663+
}
648664
default:
649665
assert false;
650666
break;
@@ -709,6 +725,10 @@ public void operationSucceed(RemotingCommand response) {
709725
}
710726

711727
producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true);
728+
if (instance != null) {
729+
instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1);
730+
instance.getProducerStatsManager().incSendRT(msg.getTopic(), cost);
731+
}
712732
return;
713733
}
714734

@@ -726,6 +746,10 @@ public void operationSucceed(RemotingCommand response) {
726746
}
727747

728748
producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true);
749+
if (instance != null) {
750+
instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1);
751+
instance.getProducerStatsManager().incSendRT(msg.getTopic(), cost);
752+
}
729753
} catch (Exception e) {
730754
producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true);
731755
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
@@ -737,6 +761,9 @@ public void operationSucceed(RemotingCommand response) {
737761
public void operationFail(Throwable throwable) {
738762
producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true);
739763
long cost = System.currentTimeMillis() - beginStartTime;
764+
if (instance != null) {
765+
instance.getProducerStatsManager().incSendFailedTimes(msg.getTopic(), 1);
766+
}
740767
if (throwable instanceof RemotingSendRequestException) {
741768
MQClientException ex = new MQClientException("send request failed", throwable);
742769
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
@@ -756,6 +783,9 @@ public void operationFail(Throwable throwable) {
756783
} catch (Exception ex) {
757784
long cost = System.currentTimeMillis() - beginStartTime;
758785
producer.updateFaultItem(brokerName, cost, true, false);
786+
if (instance != null) {
787+
instance.getProducerStatsManager().incSendFailedTimes(msg.getTopic(), 1);
788+
}
759789
onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance,
760790
retryTimesWhenSendFailed, times, ex, context, true, producer);
761791
}

Diff for: client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

+7
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.rocketmq.client.impl.producer.TopicPublishInfo;
5858
import org.apache.rocketmq.client.producer.DefaultMQProducer;
5959
import org.apache.rocketmq.client.stat.ConsumerStatsManager;
60+
import org.apache.rocketmq.client.stat.ProducerStatsManager;
6061
import org.apache.rocketmq.common.MQVersion;
6162
import org.apache.rocketmq.common.MixAll;
6263
import org.apache.rocketmq.common.ServiceState;
@@ -137,6 +138,7 @@ public Thread newThread(Runnable r) {
137138
private final PullMessageService pullMessageService;
138139
private final RebalanceService rebalanceService;
139140
private final DefaultMQProducer defaultMQProducer;
141+
private final ProducerStatsManager producerStatsManager;
140142
private final ConsumerStatsManager consumerStatsManager;
141143
private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0);
142144
private ServiceState serviceState = ServiceState.CREATE_JUST;
@@ -214,6 +216,7 @@ public void onChannelActive(String remoteAddr, Channel channel) {
214216
this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP);
215217
this.defaultMQProducer.resetClientConfig(clientConfig);
216218

219+
this.producerStatsManager = new ProducerStatsManager(this.scheduledExecutorService);
217220
this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService);
218221

219222
log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}",
@@ -1387,6 +1390,10 @@ public ConsumerStatsManager getConsumerStatsManager() {
13871390
return consumerStatsManager;
13881391
}
13891392

1393+
public ProducerStatsManager getProducerStatsManager() {
1394+
return producerStatsManager;
1395+
}
1396+
13901397
public NettyClientConfig getNettyClientConfig() {
13911398
return nettyClientConfig;
13921399
}

Diff for: client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java

+4
Original file line numberDiff line numberDiff line change
@@ -1061,6 +1061,10 @@ private SendResult sendKernelImpl(final Message msg,
10611061
requestHeader,
10621062
timeout - costTimeSync,
10631063
communicationMode,
1064+
null,
1065+
null,
1066+
this.mQClientFactory,
1067+
0,
10641068
context,
10651069
this);
10661070
break;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.rocketmq.client.stat;
19+
20+
import java.util.concurrent.ScheduledExecutorService;
21+
import org.apache.rocketmq.common.stats.StatsItemSet;
22+
import org.apache.rocketmq.logging.org.slf4j.Logger;
23+
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
24+
25+
public class ProducerStatsManager {
26+
private static final Logger log = LoggerFactory.getLogger(ProducerStatsManager.class);
27+
28+
private static final String TOPIC_SEND_OK_TPS = "SEND_OK_TPS";
29+
private static final String TOPIC_SEND_FAILED_TPS = "SEND_FAILED_TPS";
30+
private static final String TOPIC_SEND_RT = "SEND_RT";
31+
32+
private final StatsItemSet topicSendOKTPS;
33+
private final StatsItemSet topicSendFailedTPS;
34+
private final StatsItemSet topicSendRT;
35+
36+
public ProducerStatsManager(final ScheduledExecutorService scheduledExecutorService) {
37+
this.topicSendOKTPS = new StatsItemSet(TOPIC_SEND_OK_TPS, scheduledExecutorService, log);
38+
this.topicSendFailedTPS = new StatsItemSet(TOPIC_SEND_FAILED_TPS, scheduledExecutorService, log);
39+
this.topicSendRT = new StatsItemSet(TOPIC_SEND_RT, scheduledExecutorService, log);
40+
}
41+
42+
public void start() {
43+
}
44+
45+
public void shutdown() {
46+
}
47+
48+
public void incSendTimes(final String topic, final int times) {
49+
this.topicSendOKTPS.addValue(topic, times, 1);
50+
}
51+
52+
public void incSendFailedTimes(final String topic, final int times) {
53+
this.topicSendFailedTPS.addValue(topic, times, 1);
54+
}
55+
56+
public void incSendRT(final String topic, final long rt) {
57+
this.topicSendRT.addValue(topic, (int)rt, 1);
58+
}
59+
60+
public StatsItemSet getTopicSendOKTPS() {
61+
return topicSendOKTPS;
62+
}
63+
64+
public StatsItemSet getTopicSendFailedTPS() {
65+
return topicSendFailedTPS;
66+
}
67+
68+
public StatsItemSet getTopicSendRT() {
69+
return topicSendRT;
70+
}
71+
}

Diff for: client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ public void init() throws Exception {
117117

118118
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
119119

120-
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
121-
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
122120
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
123121
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
124122
.thenReturn(createSendResult(SendStatus.SEND_OK));

Diff for: client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,6 @@ public void init() throws Exception {
103103

104104
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
105105

106-
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
107-
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
108106
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
109107
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
110108
.thenReturn(createSendResult(SendStatus.SEND_OK));

Diff for: client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -114,8 +114,6 @@ public void init() throws Exception {
114114

115115
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
116116

117-
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
118-
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
119117
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
120118
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
121119
.thenReturn(createSendResult(SendStatus.SEND_OK));

Diff for: client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -121,8 +121,6 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
121121

122122
producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl());
123123

124-
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
125-
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
126124
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
127125
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
128126
.thenReturn(createSendResult(SendStatus.SEND_OK));

Diff for: client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,6 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
137137
hooks.add(endTransactionHook);
138138
fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks);
139139

140-
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
141-
nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod();
142140
when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class),
143141
nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class)))
144142
.thenReturn(createSendResult(SendStatus.SEND_OK));

0 commit comments

Comments
 (0)