diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java index c462dd1241c..fba8473e8cb 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java @@ -629,6 +629,10 @@ public SendResult sendMessage( switch (communicationMode) { case ONEWAY: this.remotingClient.invokeOneway(addr, request, timeoutMillis); + if (instance != null) { + instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1); + instance.getProducerStatsManager().incSendRT(msg.getTopic(), System.currentTimeMillis() - beginStartTime); + } return null; case ASYNC: final AtomicInteger times = new AtomicInteger(); @@ -644,7 +648,19 @@ public SendResult sendMessage( if (timeoutMillis < costTimeSync) { throw new RemotingTooMuchRequestException("sendMessage call timeout"); } - return this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); + try { + SendResult result = this.sendMessageSync(addr, brokerName, msg, timeoutMillis - costTimeSync, request); + if (instance != null) { + instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1); + instance.getProducerStatsManager().incSendRT(msg.getTopic(), System.currentTimeMillis() - beginStartTime); + } + return result; + } catch (Exception e) { + if (instance != null) { + instance.getProducerStatsManager().incSendFailedTimes(msg.getTopic(), 1); + } + throw e; + } default: assert false; break; @@ -709,6 +725,10 @@ public void operationSucceed(RemotingCommand response) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true); + if (instance != null) { + instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1); + instance.getProducerStatsManager().incSendRT(msg.getTopic(), cost); + } return; } @@ -726,6 +746,10 @@ public void operationSucceed(RemotingCommand response) { } producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, false, true); + if (instance != null) { + instance.getProducerStatsManager().incSendTimes(msg.getTopic(), 1); + instance.getProducerStatsManager().incSendRT(msg.getTopic(), cost); + } } catch (Exception e) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, @@ -737,6 +761,9 @@ public void operationSucceed(RemotingCommand response) { public void operationFail(Throwable throwable) { producer.updateFaultItem(brokerName, System.currentTimeMillis() - beginStartTime, true, true); long cost = System.currentTimeMillis() - beginStartTime; + if (instance != null) { + instance.getProducerStatsManager().incSendFailedTimes(msg.getTopic(), 1); + } if (throwable instanceof RemotingSendRequestException) { MQClientException ex = new MQClientException("send request failed", throwable); onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, @@ -756,6 +783,9 @@ public void operationFail(Throwable throwable) { } catch (Exception ex) { long cost = System.currentTimeMillis() - beginStartTime; producer.updateFaultItem(brokerName, cost, true, false); + if (instance != null) { + instance.getProducerStatsManager().incSendFailedTimes(msg.getTopic(), 1); + } onExceptionImpl(brokerName, msg, timeoutMillis - cost, request, sendCallback, topicPublishInfo, instance, retryTimesWhenSendFailed, times, ex, context, true, producer); } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java index eba654c22d0..beb7943b05e 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java @@ -57,6 +57,7 @@ import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.stat.ConsumerStatsManager; +import org.apache.rocketmq.client.stat.ProducerStatsManager; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.ServiceState; @@ -137,6 +138,7 @@ public Thread newThread(Runnable r) { private final PullMessageService pullMessageService; private final RebalanceService rebalanceService; private final DefaultMQProducer defaultMQProducer; + private final ProducerStatsManager producerStatsManager; private final ConsumerStatsManager consumerStatsManager; private final AtomicLong sendHeartbeatTimesTotal = new AtomicLong(0); private ServiceState serviceState = ServiceState.CREATE_JUST; @@ -214,6 +216,7 @@ public void onChannelActive(String remoteAddr, Channel channel) { this.defaultMQProducer = new DefaultMQProducer(MixAll.CLIENT_INNER_PRODUCER_GROUP); this.defaultMQProducer.resetClientConfig(clientConfig); + this.producerStatsManager = new ProducerStatsManager(this.scheduledExecutorService); this.consumerStatsManager = new ConsumerStatsManager(this.scheduledExecutorService); log.info("Created a new client Instance, InstanceIndex:{}, ClientID:{}, ClientConfig:{}, ClientVersion:{}, SerializerType:{}", @@ -1387,6 +1390,10 @@ public ConsumerStatsManager getConsumerStatsManager() { return consumerStatsManager; } + public ProducerStatsManager getProducerStatsManager() { + return producerStatsManager; + } + public NettyClientConfig getNettyClientConfig() { return nettyClientConfig; } diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java index 15264f0e503..7ea03b8ea7c 100644 --- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java +++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java @@ -1061,6 +1061,10 @@ private SendResult sendKernelImpl(final Message msg, requestHeader, timeout - costTimeSync, communicationMode, + null, + null, + this.mQClientFactory, + 0, context, this); break; diff --git a/client/src/main/java/org/apache/rocketmq/client/stat/ProducerStatsManager.java b/client/src/main/java/org/apache/rocketmq/client/stat/ProducerStatsManager.java new file mode 100644 index 00000000000..20cbaa4c4c1 --- /dev/null +++ b/client/src/main/java/org/apache/rocketmq/client/stat/ProducerStatsManager.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.client.stat; + +import java.util.concurrent.ScheduledExecutorService; +import org.apache.rocketmq.common.stats.StatsItemSet; +import org.apache.rocketmq.logging.org.slf4j.Logger; +import org.apache.rocketmq.logging.org.slf4j.LoggerFactory; + +public class ProducerStatsManager { + private static final Logger log = LoggerFactory.getLogger(ProducerStatsManager.class); + + private static final String TOPIC_SEND_OK_TPS = "SEND_OK_TPS"; + private static final String TOPIC_SEND_FAILED_TPS = "SEND_FAILED_TPS"; + private static final String TOPIC_SEND_RT = "SEND_RT"; + + private final StatsItemSet topicSendOKTPS; + private final StatsItemSet topicSendFailedTPS; + private final StatsItemSet topicSendRT; + + public ProducerStatsManager(final ScheduledExecutorService scheduledExecutorService) { + this.topicSendOKTPS = new StatsItemSet(TOPIC_SEND_OK_TPS, scheduledExecutorService, log); + this.topicSendFailedTPS = new StatsItemSet(TOPIC_SEND_FAILED_TPS, scheduledExecutorService, log); + this.topicSendRT = new StatsItemSet(TOPIC_SEND_RT, scheduledExecutorService, log); + } + + public void start() { + } + + public void shutdown() { + } + + public void incSendTimes(final String topic, final int times) { + this.topicSendOKTPS.addValue(topic, times, 1); + } + + public void incSendFailedTimes(final String topic, final int times) { + this.topicSendFailedTPS.addValue(topic, times, 1); + } + + public void incSendRT(final String topic, final long rt) { + this.topicSendRT.addValue(topic, (int)rt, 1); + } + + public StatsItemSet getTopicSendOKTPS() { + return topicSendOKTPS; + } + + public StatsItemSet getTopicSendFailedTPS() { + return topicSendFailedTPS; + } + + public StatsItemSet getTopicSendRT() { + return topicSendRT; + } +} diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 33cf0df390d..d5ed046d9b6 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -117,8 +117,6 @@ public void init() throws Exception { producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK)); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java index c0eb8568dc6..389cf339040 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithOpenTracingTest.java @@ -103,8 +103,6 @@ public void init() throws Exception { producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK)); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java index ed680d8e6cf..a3670f559b9 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/DefaultMQProducerWithTraceTest.java @@ -114,8 +114,6 @@ public void init() throws Exception { producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK)); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java index 5d4b81d16db..8416f21c0f4 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithOpenTracingTest.java @@ -121,8 +121,6 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { producer.getDefaultMQProducerImpl().getMqClientFactory().registerProducer(producerGroupTemp, producer.getDefaultMQProducerImpl()); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK)); diff --git a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java index 9f6036153bc..5d0b7385598 100644 --- a/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/trace/TransactionMQProducerWithTraceTest.java @@ -137,8 +137,6 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) { hooks.add(endTransactionHook); fieldHooks.set(producer.getDefaultMQProducerImpl(), hooks); - when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), - nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))).thenCallRealMethod(); when(mQClientAPIImpl.sendMessage(anyString(), anyString(), any(Message.class), any(SendMessageRequestHeader.class), anyLong(), any(CommunicationMode.class), nullable(SendCallback.class), nullable(TopicPublishInfo.class), nullable(MQClientInstance.class), anyInt(), nullable(SendMessageContext.class), any(DefaultMQProducerImpl.class))) .thenReturn(createSendResult(SendStatus.SEND_OK));